Nifi:nifi內置處理器Processor的開發
本篇主要是介紹自定義處理器的開發方式及Nifi處理器開發的一些細節
Nifi-Processor自定義開發的流程
之前說過,大部分的數據處理,我們可以基于ExcuseGroovyScript處理器,編寫Groovy腳本去完成(或者Jpython,Js腳本等對應的組件),只能說這是基于Nifi平臺的使用層面,還不能算是對于Nifi的開發,真正基于Nifi開發,至少要掌握Nifi擴展自定義組件,即根據實際需求,進行內置處理器(Processor)的開發。當然Nifi本身官方是給出了擴展的口,如果我們下載Nifi源碼,就能看到,在Nifi源碼中有個 nifi-example-bundle 的代碼包,就是Nifi提供的擴展方式,里面附了一個小樣例,感興趣的可以去看Nifi的源碼。這里主要是對開發自定義處理器進行一個完整的經驗總結。
總體來說,Nifi-Processor自定義開發的流程就是:基于Nifi規則下的編碼—>代碼打包為nar包——>發布使用。
基于Nifi規則下的編碼
Nifi本身提供了擴展處理器的方法,并有一套完整的接口、類,只需要按著規則去實現接口、繼承抽象類、覆蓋方法,實現一個自己的Processor類
代碼打包為nar包
Nifi內置Processor的存在形式就是nar包(Nifi自身定義的),必須將自定義的代碼,按著一定的規則進行打包,最終你的Processor是以nar的形式嵌入Nifi中,其實下面的 “自定義Nifi-Processor項目的兩種搭建方式” 主要就是maven項目兩種不同的表現方式,根本目的是為了最終能生成nar包
發布使用
完成開發后,將打好的nar包,放到Nifi安裝目錄的lib下即可,lib目錄本身存放了Nifi出廠自帶的數據處理器nar包,如果想依樣畫葫蘆的學習,可以隨便摘取一個nar包,反編譯一下看看它的寫法。
自定義Nifi-Processor項目的兩種搭建方式
nifi的項目主要是maven項目,我們必須按照maven規范進行開發Nifi的組件,這里經過踩坑,博主總結了兩種Nifi處理器開發的方式:
- 基于Nifi官方提供的bundle模式
- 基于普通maven項目,通過pom文件的方式完成nar包的構建
不管哪種方式,自定義Nifi-Processor組件項目根本的步驟就是:
- 在resources目錄下建立 META-INF.services目錄,然創建文件 org.apache.nifi.processor.Processor
- org.apache.nifi.processor.Processor 文件內聲明自己的Processor類的全路徑
- 將項目配置成能夠打包為nar包的結構
其實這種只是在 “將項目配置成能夠打包為nar包的結構” 上有差異,本質上都是為了打成Nifi推出的nar程序包??傮w上來講,第一種更為規范,更符合項目工程化管控;第二種有點野路子的感覺,就是一個獨立的maven項目,但是好理解、操作方便,建議新手可以從第二種方式入手。
基于官方給出的bundle模式的開發
這種方式是官方給出的,特點就是便于管理和控制開發規范,我們從Nifi給出的樣例碼說起;
下載Nifi源碼后,可以看見 nifi-example-bundle 的代碼包,其實這一代碼包就是標準的自定義Processor開發的模板,自定義創建的時候,可以參考該樣例,創建一個maven項目,進行開發。

它本身是基于maven模塊化的構建,整體的目錄結構是nifi-example-bundle作為一個整體項目,下屬兩個模塊包,nifi-nifi-example-nar和nifi-nifi-example-processors,其中nifi-example-bundle作為一個根項目,nifi-nifi-example-nar里只有一個pom文件,負責將程序打包成nar包,nifi-nifi-example-processors才是真正的靈魂,自定義Processor代碼的存放處。

nifi-example-bundle
整體項目nifi-example-bundle作為一個maven根項目,通過pom文件管控nifi-nifi-example-nar和nifi-nifi-example-processors,其pom文件結構為:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.nifi</groupId> <artifactId>nifi-external</artifactId> <version>1.11.4</version> </parent> <artifactId>nifi-example-bundle</artifactId> <packaging>pom</packaging> <modules> <module>nifi-nifi-example-processors</module> <module>nifi-nifi-example-nar</module> </modules> <dependencyManagement> <dependencies> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-nifi-example-processors</artifactId> <version>1.13.2</version> </dependency> </dependencies> </dependencyManagement> </project>
nifi-nifi-example-processors
nifi-nifi-example-processors項目是核心的,也是我們主要寫代碼的地方,我們要寫的自定義Processor即在這里完成,

1、在resources目錄下建立 META-INF.services,然創建文件 org.apache.nifi.processor.Processor
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.processors.WriteResourceToStream
2、開發自己的Processor程序,建立一個Processor類,并且繼承實現抽象類 AbstractProcessor,根據自己需要實現一些方法
package org.apache.nifi.processors; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.Charset; import java.util.Collections; import java.util.HashSet; import java.util.Set; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; @Tags({ "example", "resources" }) @CapabilityDescription("This example processor loads a resource from the nar and writes it to the FlowFile content") public class WriteResourceToStream extends AbstractProcessor { public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("files that were successfully processed").build(); public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") .description("files that were not successfully processed").build(); private Set<Relationship> relationships; private String resourceData; @Override protected void init(final ProcessorInitializationContext context) { final Set<Relationship> relationships = new HashSet<Relationship>(); relationships.add(REL_SUCCESS); relationships.add(REL_FAILURE); this.relationships = Collections.unmodifiableSet(relationships); final InputStream resourceStream = getClass() .getClassLoader().getResourceAsStream("file.txt"); try { this.resourceData = IOUtils.toString(resourceStream, Charset.defaultCharset()); } catch (IOException e) { throw new RuntimeException("Unable to load resources", e); } finally { IOUtils.closeQuietly(resourceStream); } } @Override public Set<Relationship> getRelationships() { return this.relationships; } @OnScheduled public void onScheduled(final ProcessContext context) { } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); if (flowFile == null) { return; } try { flowFile = session.write(flowFile, new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { IOUtils.write(resourceData, out, Charset.defaultCharset()); } }); session.transfer(flowFile, REL_SUCCESS); } catch (ProcessException ex) { getLogger().error("Unable to process", ex); session.transfer(flowFile, REL_FAILURE); } } }
3、nifi-nifi-example-processors的pom文件樣例:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.nifi</groupId> <artifactId>nifi-example-bundle</artifactId> <version>1.13.2</version> </parent> <artifactId>nifi-nifi-example-processors</artifactId> <packaging>jar</packaging> <dependencies> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> <version>1.13.2</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-processor-utils</artifactId> <version>1.13.2</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> <version>1.13.2</version> <scope>test</scope> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> <configuration> <excludes> <exclude>src/main/resources/file.txt</exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
nifi-nifi-example-nar
這一maven只是負責把Processor的jar整合打包成nar包,只有一個pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.nifi</groupId> <artifactId>nifi-example-bundle</artifactId> <version>1.13.2</version> </parent> <artifactId>nifi-example-nar</artifactId> <packaging>nar</packaging> <properties> <maven.javadoc.skip>true</maven.javadoc.skip> <source.skip>true</source.skip> </properties> <dependencies> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-nifi-example-processors</artifactId> </dependency> </dependencies> </project>
完整的demo代碼可以去github中查看 :https://github.com/GCC1566/Apache-Nifi-Processor/tree/master/nifi-JsonProcessor-bundle
獨立的maven程序模式開發
這種方式很獨立,即當作一個maven項目去建立,不需要考慮過多。
按著maven項目的構建方式,構建一個基本的maven項目:

修改pom.xml文件
這種方式看起來更容易,只是所有的難點都集中在了pom文件,具體的pom文件樣例下面給出,可直接復制到自己的項目中,有額外需要的依賴,自行追加。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>net.gcc.nifi</groupId> <artifactId>JsonDistributeProcessor</artifactId> <version>1.0</version> <packaging>nar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <nifi.version>1.13.2</nifi.version> </properties> <dependencies> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> <version>${nifi.version}</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-utils</artifactId> <version>${nifi.version}</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-processor-utils</artifactId> <version>${nifi.version}</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> <version>${nifi.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.nifi</groupId> <artifactId>nifi-nar-maven-plugin</artifactId> <version>1.3.1</version> <extensions>true</extensions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.18.1</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> <version>3.1.0</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-install-plugin</artifactId> <version>2.5.2</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-deploy-plugin</artifactId> <version>2.8.2</version> </plugin> </plugins> </build> </project>
在項目里建立自己的java文件,實現 AbstractProcessor 類
前置都準備好了,就可以開始編寫自定義的Processor,就隨便建個類,并讓他繼承 AbstractProcessor,然后完成一些方法。
例如:

demo:
package net.gcc.nifi.processors; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.*; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StringUtils; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * JSONDistributeProcessor * According to the rules, distribute the data to the specified Relationship * @author GCC */ @SideEffectFree @Tags({"JsonDataDistribute","net.gcc"}) @CapabilityDescription("Divide data according to configuration") public class JsonDataDistributeProcessor extends AbstractProcessor{ public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("files that were successfully processed").build(); public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") .description("files that were not successfully processed").build(); private Set<Relationship> relationships; @Override protected void init(final ProcessorInitializationContext context) { final Set<Relationship> relationships = new HashSet<Relationship>(); relationships.add(REL_SUCCESS); relationships.add(REL_FAILURE); this.relationships = Collections.unmodifiableSet(relationships); } @Override public Set<Relationship> getRelationships() { return this.relationships; } @OnScheduled public void onScheduled(final ProcessContext context) { } @Override public void onTrigger(final ProcessContext context, //具體的數據處理 } }
在resources目錄下建立 META-INF.services,然創建文件 org.apache.nifi.processor.Processor
類實現后,將實現類的全路徑寫在Nifi規定的配置文件種。

org.apache.nifi.processor.Processor文件的內容樣例(可直接復制,只需要改動最后一行即可):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
net.gcc.nifi.processors.JsonDataDistributeProcessor
這里,最后一行為個人自定義Processor組件的類全路徑,一個maven項目中可以創建多個Processor,只需要在這里追加。
完整的demo代碼可以去github中查看 :https://github.com/GCC1566/JsonDistributeProcessor
關于Nifi-Processor開發的一些知識
FlowFile
FlowFile是一種邏輯概念,它將一段數據與一組關于該數據的屬性相關聯。這些屬性包括FlowFile的唯一標識符,以及其名稱,大小和任何數量的其他特定于流的值。雖然FlowFile的內容和屬性可以更改,但FlowFile對象是不可變的。ProcessSession可以對FlowFile進行修改。
FlowFiles的核心屬性在org.apache.nifi.flowfile.attributes.CoreAttributes枚舉中定義。您將看到的最常見屬性是filename,path和uuid。引號中的字符串是CoreAttributes枚舉中屬性的值。
-
Filename(“filename”):FlowFile的文件名。文件名不應包含任何目錄結構。
-
UUID(“uuid”):分配給此FlowFile的通用唯一標識符,用于區分FlowFile與系統中的其他FlowFiles。
-
Path(“path”):FlowFile的路徑指示FlowFile所屬的相對目錄,不包含文件名。
-
Absolute Path (“absolute.path”):FlowFile的絕對路徑表示FlowFile所屬的絕對目錄,不包含文件名。
-
Priority(“priority”):表示FlowFile優先級的數值。
-
MIME Type(“mime.type”):此FlowFile的MIME類型。
-
Discard Reason(“discard.reason”):指定丟棄FlowFile的原因。
-
Alternative Identifier(“alternate.identifier”):表示已知引用此FlowFile的FlowFile的UUID以外的標識符。
ProcessSession
ProcessSession通常簡稱為“會話”,它提供了一種機制,通過該機制可以創建,銷毀,檢查,克隆FlowFiles并將其傳輸到其他處理器。此外,ProcessSession還提供了通過添加或刪除屬性或修改FlowFile內容來創建FlowFiles的修改版本的機制。ProcessSession還公開了一種用于發布源代碼事件的機制,該機制提供了跟蹤FlowFile的沿襲和歷史的能力。在一個或多個FlowFiles上執行操作后,可以提交或回滾ProcessSession。
ProcessorInitializationContext
創建處理器后,initialize將使用InitializationContext對象調用其方法。此對象向處理器公開配置,該配置在處理器的整個生命周期內不會更改,例如處理器的唯一標識符。
ProcessContext
ProcessContext提供了處理器和框架之間的橋梁。它提供有關處理器當前如何配置的信息,并允許處理器執行特定于Framework的任務,例如產生其資源,以便框架將安排其他處理器運行而不會不必要地消耗資源。
PropertyDescriptor
PropertyDescriptor定義將由Processor,ReportingTask或ControllerService使用的屬性。屬性的定義包括其名稱,屬性的描述,可選的默認值,驗證邏輯,以及關于處理器是否有效所需的屬性的指示符。PropertyDescriptors是通過實例化PropertyDescriptor.Builder 類的實例,調用適當的方法來填充有關屬性的詳細信息,最后調用該build方法來創建的。
Validator(驗證器)
PropertyDescriptor必須指定一個或多個Validator,可用于確保用戶輸入的屬性值有效。如果Validator指示屬性值無效,則在屬性生效之前,將無法運行或使用Component。如果未指定Validator,則假定Component無效,NiFi將報告該屬性不受支持。
Relationship
關系定義FlowFile可以從處理器傳輸到的路由。通過實例化Relationship.Builder 類的實例,調用適當的方法來填充關系的細節,最后調用 build方法來創建關系。
ComponentLog
鼓勵處理器通過ComponentLog接口執行日志記錄 ,而不是獲取第三方記錄器的直接實例。這是因為通過ComponentLog進行日志記錄允許框架將超出可配置嚴重性級別的日志消息呈現給用戶界面,從而允許在發生重要事件時通知監視數據流的人員。此外,它通過在DEBUG模式下記錄堆棧跟蹤并在日志消息中提供處理器的唯一標識符,為所有處理器提供一致的日志記錄格式。
Processor的職責和一些基本概念
我們來看一個自定義Processor的繼承結構:

圖中,Tags,SideEffectFree、CapabilityDescription,是基于注解的,都是為了UI頁面上的展示,這里我們忽略不看,主要看它的主線,整個自定義Processor繼承自AbstractProcessor,而AbstractProcessor又繼承了一系列。這里說明一下,雖然Processor是一個可以直接實現的接口,但這樣做非常罕見,因為它org.apache.nifi.processor.AbstractProcessor是幾乎所有處理器實現的基類。AbstractProcessor類提供的功能的顯著,這使得開發的處理器更容易,更方便的任務。對于本文檔的范圍,我們將主要關注AbstractProcessor處理Processor API時的類。
處理器AbstractProcessor
處理器有很多方法,關乎著處理器的加載、運行、處理數據等等,這里只介紹幾個最為重要的。
init()
該方法在處理器初始化的時候被調用,該方法采用單個參數,即類型ProcessorInitializationContext。上下文對象為Processor提供ComponentLog,Processor的唯一標識符和ControllerServiceLookup,可用于與配置的ControllerServices交互。每個這樣的對象是由AbstractProcessor存儲,并且可以由子類經由獲得getLogger,getIdentifier和 getControllerServiceLookup方法。
getRelationships()
處理器通過覆蓋該getRelationships方法來公開有效的關系集 。這個方法沒有參數,并返回Set的Relationship 對象。對于大多數處理器,此Set將是靜態的,但其他處理器將根據用戶配置動態生成Set。對于Set為靜態的那些處理器,建議在Processor的構造函數或init方法中創建一個不可變的Set并返回該值,而不是動態生成Set。這種模式有助于實現更清晰的代碼和更好的性能。
getSupportedPropertyDescriptors()
大多數處理器在能夠使用之前需要一些用戶配置。處理器支持的屬性通過該getSupportedPropertyDescriptors方法向頁面的組件公開 。這個方法沒有參數,并返回List的 PropertyDescriptor對象。List中對象的順序很重要,因為它決定了在用戶界面中呈現屬性的順序。
PropertyDescriptor目的是通過創建一個新的實例構造PropertyDescriptor.Builder對象,調用構建器的適當的方法,并最終調用build方法。
雖然此方法涵蓋了大多數用例,但有時需要允許用戶配置名稱未知的其他屬性。這可以通過覆蓋該getSupportedDynamicPropertyDescriptor方法來實現 。此方法將 String唯一參數作為參數,該參數指示屬性的名稱。該方法返回一個PropertyDescriptor對象,該 對象可用于驗證屬性的名稱以及值。應該構建從此方法返回的任何PropertyDescriptor,isDynamic在PropertyDescriptor.Builder類中將值設置為true 。AbstractProcessor的默認行為是不允許任何動態創建的屬性。
然后處理器的屬性,是需要驗證的,如果處理器的配置無效,則無法啟動處理器。可以通過在PropertyDescriptor上設置Validator或通過PropertyDescriptor.Builder的allowableValues方法或identifiesControllerService方法限制屬性的允許值來驗證Processor屬性。
但是,有時候單獨驗證處理器的屬性是不夠的。為此,AbstractProcessor公開了一個customValidate方法。該方法采用單個參數類型ValidationContext。此方法的返回值是描述驗證期間發現的任何問題Collection的 ValidationResult對象。只應返回其isValid方法返回的ValidationResult對象 false。僅當所有屬性根據其關聯的Validators和Allowable Values有效時,才會調用此方法。即,只有當所有屬性本身都有效時才會調用此方法,并且此方法允許整體驗證處理器的配置。
onPropertyModified()
這方法總的作用就是:響應配置更改,也就是當頁面組件的配置標簽頁里配置發生變化,該方法就會被執行一次。當用戶更改Processor的屬性值時,onPropertyModified將為每個已修改的屬性調用該 方法。該方法有三個參數:PropertyDescriptor,它指示修改了哪個屬性,舊值和新值。如果屬性沒有先前的值,則第二個參數將是null。如果刪除了屬性,則第三個參數將是null。重要的是要注意,無論值是否有效,都將調用此方法。只有在實際修改了值時才會調用此方法,而不是在用戶更新處理器而不更改其值時調用此方法。在調用此方法時,保證調用此方法的線程是當前在Processor中執行代碼的唯一線程,除非Processor本身創建自己的線程。
onTrigger()
當處理器有工作要做時,它計劃onTrigger通過框架調用其方法來完成。該方法有兩個參數:a ProcessContext和aProcessSession。該onTrigger方法的第一步通常是通過調用getProcessSession上的一個方法來獲取要在其上執行工作的FlowFile 。對于從外部源將數據提取到NiFi的處理器,將跳過此步驟。然后,處理器可以自由檢查FlowFile屬性; 添加,刪除或修改屬性; 讀取或修改FlowFile內容; 并將FlowFiles傳輸到適當的關系。
處理器被觸發時(When Processors are Triggered)
onTrigger只有在計劃運行處理器并且處理器存在工作時,才會調用處理器的方法。如果滿足以下任何條件,則稱處理器存在工作:
-
-
目標為Processor的Connection在其隊列中至少有一個FlowFile
-
處理器沒有傳入的連接
-
處理器使用@TriggerWhenEmpty批注進行批注
-
有幾個因素會導致onTrigger調用Processor的 方法。首先,除非用戶已將處理器配置為運行,否則不會觸發處理器。如果計劃運行處理器,則周期性地(該周期由用戶界面中的用戶配置)檢查處理器是否有工作,如上所述。如果是這樣,框架將檢查處理器的下游目的地。如果處理器的任何出站連接已滿,則默認情況下,將不會安排處理器運行。
但是,@TriggerWhenAnyDestinationAvailable注釋可以添加到Processor的類中。在這種情況下,需求被更改,以便只有一個下游目標必須“可用”(如果連接的隊列未滿,則目標被視為“可用”),而不是要求所有下游目標都可用。
與處理器調度有關的還有@TriggerSerially 注釋。使用此Annotation的處理器永遠不會有多個線程onTrigger同時運行該方法。但是,必須注意,執行代碼的線程可能會從調用更改為調用。因此,仍然必須注意確保處理器是線程安全的!
自定義Nifi-Processor組件的單元測試
處理器或控制器服務的大多數單元測試都是通過創建TestRunner 類的實例來完成的。
一般需要額外引入maven依賴,由apache-nifi官方提供
<dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> <version>${nifi version}</version> </dependency>
TestRunner
TestRunner類是Nifi專門用來模擬實際環境下,運行Processor或者ControllerService的一個接口,它的實例可以通過 TestRunner.newTestRunner()來創建,newTestRunner方法的參數是你要運行的Proccessor的類;例如
TestRunner run = TestRunner.newTestRunner(JSONDistributeProcessor.class);
添加ControllerServices
在啟動一個處理器的時候,有些處理器可能需要額外需要ControllerService才能正常運行,這時候,可以通過模擬追加ControllerService來完成。
它的性質可以通過調用被設置:
setProperty(ControllerService, PropertyDescriptor, String),
setProperty(ControllerService, String, String)
setProperty(ControllerService, PropertyDescriptor, AllowableValue)
任何一個來完成。每種方法都返回一個 ValidationResult。然后可以檢查此對象以確保通過調用該屬性有效isValid。可以通過調用setAnnotationData(ControllerService, String)方法來設置注釋數據。
assertValid(ControllerService) 方法來模擬驗證ControllerService有效
assertNotValid(ControllerService) 來模擬驗證ControllerService無效
將Controller Service添加到Test Runner并進行配置后,通過調用:
enableControllerService(ControllerService)方法啟用它 。
如果Controller Service無效,則此方法將拋出IllegalStateException。
設置屬性值
每個處理器可能需要具備一定的屬性配置,也就是Web頁面中每個Processor的配置頁面
這里可以通過:
setProperty(PropertyDescriptor, String)方法進行追加配置
每個setProperty方法再次返回一個ValidationResult屬性,可用于確保屬性值有效。
預制FlowFiles測試數據
然后模擬前置管道涌入的數據,待測數據,可以通過TestRunner的enqueue方法來模擬批量的FlowFile數據,本身enqueue方法本身支持幾種輸入:byte[] ,InputStram、Path、Map
當然也可以自行實現enqueue方法,來滿足自身的擴展需要
運行處理器
配置Controller Services并將必要的FlowFile排入隊列后,可以通過調用run方法來觸發處理器運行TestRunner。如果在沒有任何參數的情況下調用此方法,它將使用@OnScheduled注釋調用Processor中的任何方法,調用Processor的onTrigger方法一次,然后運行@OnUnscheduledfinally @OnStopped方法。
如果希望在觸發onTrigger其他事件@OnUnscheduled和 @OnStopped生命周期事件之前運行該方法的多次迭代,則該run(int)方法可用于指定現在onTrigger應該調用的許多迭代。
還有,當我們想要觸發處理器上運行,但不會觸發時間@OnUnscheduled和@OnStopped 生命周期事件。例如,這有助于在這些事件發生之前檢查處理器的狀態。這可以使用run(int, boolean)和傳遞false作為第二個參數來實現。但是,在執行此操作后,調用@OnScheduled生命周期方法可能會導致問題。因此,我們現在可以onTrigger再次運行,而不會通過使用方法的run(int,boolean,boolean)版本run并false作為第三個參數傳遞來發生這些事件。
如果測試多個線程發生的行為很有用,這也可以通過調用setThreadCount方法來實現 TestRunner。默認值為1個線程。如果使用多個線程,請務必記住,run調用TestRunner指定應觸發處理器的次數,而不是每個線程應觸發處理器的次數。因此,如果線程計數設置為2但 run(1)被調用,則只使用一個線程。
驗證輸出Relationship
處理器運行完畢后,單元測試通常需要驗證FlowFiles是否符合預期要求。通過以下兩種方法來查看:
TestRunnersassertAllFlowFilesTransferred():此方法將關系和整數作為參數,以指示應該將多少FlowFiles傳輸到該關系。除非將此數量的FlowFiles轉移到給定的關系或者任何FlowFile被轉移到任何其他關系,否則該方法將無法通過單元測試。
assertTransferCount():僅驗證FlowFile計數是給定關系的預期數量。
如果想要獲取實際輸出的數據樣例,通過以下方法:
getFlowFilesForRelationship(): 獲得實際的輸出FlowFiles 。這個方法返回一個List<MockFlowFile>。重要的是要注意List的類型MockFlowFile,而不是FlowFile接口。這樣做是因為MockFlowFile有許多方法可以驗證內容。
一個實際例子
public class JsonDataDistributeProcessorTest { private TestRunner runner = TestRunners.newTestRunner(new JsonDataDistributeProcessor()); @Test public void runProcessor(){ runner.setProperty("zhangsan","{\n" + " \"logic\":\"&&\",\n" + " \"fields\":[\n" + " {\n" + " \"field\":\"name\",\n" + " \"value\":[\"張三\"]\n" + " },\n" + " {\n" + " \"field\":\"age\",\n" + " \"value\":[12]\n" + " }\n" + " ]\n" + "}"); runner.setProperty("lisi","{\n" + " \"logic\":\"&&\",\n" + " \"fields\":[\n" + " {\n" + " \"field\":\"name\",\n" + " \"value\":[\"張三\"]\n" + " }" + " ]\n" + "}"); runner.assertValid(); runner.enqueue("[{\"name\":\"張三\",\"age\":12},\n" + "{\"name\":\"王三\",\"age\":12},\n" + "{\"name\":\"張三\",\"age\":16},\n" + "{\"sdf\":\"wn\",\"age\":18}\n" + "]"); runner.run(); List<MockFlowFile> zhangsanlist = runner.getFlowFilesForRelationship("zhangsan"); List<MockFlowFile> lisi = runner.getFlowFilesForRelationship("lisi"); for(MockFlowFile mk:zhangsanlist) { System.out.println(mk.toString()); } for(MockFlowFile mk:lisi) { System.out.print(mk.toString()); } } }
1、Nifi:基本認識
2、Nifi:基礎用法及頁面常識
3、Nifi:ExcuseXXXScript組件的使用(一)
4、Nifi:ExcuseXXXScript組件的使用(二)
5、Nifi:ExcuseXXXScript組件的使用(三)
6、Nifi:自定義處理器的開發
7、Nifi:Nifi的Controller Service

浙公網安備 33010602011771號