Nifi:Nifi中的Controller Service
Service簡介
首先Nifi中的Controller Service 和我們MVC概念中的Controller Service不是一個概念,Nifi中的Controller Service更像是和Processor同級的一個概念,它和Processor在我個人的使用經驗來理解的話就是它是預制好的各種服務,可以被Processor引用或者支撐Processor,例如一個SQL讀取的Processor,它得需要JDBC的連接,才能訪問數據庫。這里Controller Service 就可以是一個JDBC的連接池服務。
同理,Controller Service 也是支持擴展的,可以像自定義開發Processor一樣,根據自己的業務需求,進行自定義的Controller Service 開發。
當我們使用某些依賴Service的組件(Processor)時,在配置中會出現選擇Service或者創建新的Service的情況,這里的Service即是Nifi的Controller Service,一旦創建新的,則會生成一個以Group為范圍的 “全局” Service對象,這時,再有依賴同類型Service的Processor時,可以直接選中:


Controller Service的配置
單獨查看Controller Service 可以從面板空白處,右鍵Configure來看,如下圖:

這是一個JDBC的連接池Service,它包含的屬性有名稱、類型、簡介、啟用狀態、操作;從操作中可以看到配置該Service需要填寫基本的各類屬性;其中,Service是有啟停狀態的,如果想修改Service的屬性內容,必須先保證該Service是停用狀態,然后點擊配置標識,則進入配置頁面,它的配置和Processor的差不多,通過頁簽區別,共有三個頁簽:SETTING(基礎屬性)、PROPERTIES(使用屬性)、COMMENT(頁簽):
SETTING 基礎屬性
基礎屬性,包含左側的名稱,名稱可以進行更改,右側包含引用此Service的Processor 列表
PROPERTIES 使用屬性

核心的業務配置,此標簽頁的配置項根據不同的Service,配置內容不一致,具體的配置項以及使用,可以參考官方的文檔;這里的是JDBC的連接池,所以基本需要連接數據庫所需的URL、數據庫的賬號密碼、數據庫的驅動類名稱、驅動類的依賴 jar包路徑 ,這里不少Service可能都需要第三方的jar包依賴才可以使用,長期使用或生產環境下,建議將所有jar資源集中放在統一路徑下。
COMMENT 頁簽

一個提供Service使用說明的頁簽,可根據自己實際需求,補充使用Service的用法以及描述
Service 的使用范圍
在 nifi的基本使用 中的Group的使用介紹,Group同時也對Services起作用,如果我們在一個Nifi的最外層的平面上 新增Controller Service,那么這些Service的作用域是整個Nifi的任何位置,如果我們在某個Group內創建Controller Service, 那么這個Controller Service 僅在Group范圍內可以被引用,Nifi的這種機制也是方便Service的使用和維護

全局參數配置
類似于 數據庫連接池、Kafka、Redis等各種組件的連接池、客戶端Client的Service在實際的使用中會非常多,由此配置的Service也會非常多,于是就會產生很多次的反復配置URL、賬號這一系列重復的內容,由于Nifi的特性,這些Service又和組件(Processor)一樣,四散在各處,這就使得維護和運維管理變得很繁瑣,調試、調整、查看的時候,要不停的各個group來回跳轉、調整不同的Service的Configure;為應對此類問題,Nifi 提供了全局配置的機制來彌補。
使用變量前:

這里的 URL、Driver Class Name、Database User在實際生產環境中,可能都是固定的數據庫和固定的服務,幾乎不需要變得,可能只需要配置一遍就好,不需要每次創建Service都寫一遍;所以可以這里可以使用上下文變量(Parameter Context)
首先,打開Parameter Context,創新一組新的變量:





之后進入Service 的管控面板(空白處右鍵選擇Configure),先選擇變量組:

再進入 CONTROLLER SERVICES 對Service的配置進行修改,將具體的RUL、Driver-name、user等參數,全部使用變量替換(變量使用‘#’符 )

DBCPConnectionPool的使用樣例
下面將使用Nifi 實現一個簡單的Demo,從Mysql數據庫中讀取部分數據,將數據進行篩選,然后將數據輸出;
首先,使用ExecuteSQL組件,讀取Mysql中的數據,根據上文描述,創建一個DBCPConnectionPool 的Service,然后啟動 :

添加 ExecuteSQL組件,配置相關內容,根據自定義編寫的SQL讀取數據庫內容:

隨后添加 ConvertAvroToJSON 組件,這里從數據庫讀出的數據是不可讀的,為了方便查看調試、同時也是為了后續使用groovy處理數據,所以選擇轉換為JSON進行處理,實際使用可以根據自身情況選擇轉換器:

添加 ExecuteGroovyScript 組件,使用groovy腳本對數據進行處理,groovy的腳本內容如下:

groovy內容:
import org.apache.commons.io.IOUtils; import java.nio.charset.StandardCharsets; import groovy.json.JsonBuilder; import groovy.json.JsonOutput; import groovy.json.JsonSlurper; import groovy.json.StringEscapeUtils; import java.util.*; def dataJson = getInputJSONData() if(null == dataJson){ return; } def rss = [] for(int i = 0; i < dataJson.size();i++){ def tem = dataJson.get(i); //在這里可以對數據進行處理 rss.add(tem.name); } // 輸出 if(rss.size()>0){ sendData(rss,REL_SUCCESS); } /** * 讀取輸入流 * @author GCC ***/ def getInputJSONData(){ def flow = session.get() if(null == flow){ log.error("the flow is null ..."); return; } def dataJson = null; def jsonStr = ""; session.read(flow,{ inputStream -> jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8) } as InputStreamCallback); try{ dataJson = new JsonSlurper().parseText(jsonStr); }catch(Exception e){ log.error("輸入流格式錯誤") } session.remove(flow); return dataJson; } /** *輸出數據至后續管道 *@param result 輸出的數據 *@param outStream 輸出的管道 *@author GCC ***/ void sendData(def result,def outStream){ String successFlowFileStr =StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString()); def newflow = session.create(); newflow = session.write(newflow, { outputStream -> outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback) session.transfer(newflow, outStream); }
最后使用LogMessage組件作為接收數據,實際情況可以將數據轉為下一處理節點或存儲等等


在ExcuseGroovyScript組件中使用Service
在 ExcuseGroovyScript 組件內部使用groovy腳本處理數據時,可能需要再次讀取數據庫或者使用其他第三方數據來輔助處理,這時候,ExcuteGroovyScript組件支持可以引入Service,提供用戶編寫的groovy腳本內部使用Service;
首先需要在ExcuteGroovyScript組件的PROPERTIES 配置中新增屬性:

這里,添加屬性時,會讓用戶輸入用戶給該屬性的命名,如果是普通命名,這里的屬性僅僅作為靜態數據而已,但是如果使用關鍵字 ‘SQL.’ 或者 'CTL.'作為名稱前綴時,則能夠使用Service,后續的屬性值則會變成Service的選擇。
在groovy的代碼中,則可以通過 SQL.mysql.{method}的方式,調用Service的方法:
import org.apache.commons.io.IOUtils; import java.nio.charset.StandardCharsets; import groovy.json.JsonBuilder; import groovy.json.JsonOutput; import groovy.json.JsonSlurper; import groovy.json.StringEscapeUtils; import java.util.*; def dataJson = getInputJSONData() if(null == dataJson){ return; } def rss = [] for(int i = 0; i < dataJson.size();i++){ def tem = dataJson.get(i); def mapdic = [:] //使用Service查詢數據庫 SQL.mysql.eachRow("SELECT id,value FROM tb_dic_detail WHERE u_status = 1 "){ row-> mapdic.put(row.id.toString(),row.value.toString());
} rss.add(tem.name); } // 輸出 if(rss.size()>0){ sendData(rss,REL_SUCCESS); } /*****************************************************************公共函數*********************************************************************/ /** * 讀取輸入流 * @author GCC ***/ def getInputJSONData(){ def flow = session.get() if(null == flow){ log.error("the flow is null ..."); return; } def dataJson = null; def jsonStr = ""; session.read(flow,{ inputStream -> jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8) } as InputStreamCallback); try{ dataJson = new JsonSlurper().parseText(jsonStr); }catch(Exception e){ log.error("輸入流格式錯誤") } session.remove(flow); return dataJson; } /** *輸出數據至后續管道 *@param result 輸出的數據 *@param outStream 輸出的管道 *@author GCC ***/ void sendData(def result,def outStream){ String successFlowFileStr =StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString()); def newflow = session.create(); newflow = session.write(newflow, { outputStream -> outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback) session.transfer(newflow, outStream); }
1、Nifi:基本認識
2、Nifi:基礎用法及頁面常識
3、Nifi:ExcuseXXXScript組件的使用(一)
4、Nifi:ExcuseXXXScript組件的使用(二)
5、Nifi:ExcuseXXXScript組件的使用(三)
6、Nifi:自定義處理器的開發
7、Nifi:Nifi的Controller Service

浙公網安備 33010602011771號