<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12
      Fork me on GitHub

      Nifi:Nifi中的Controller Service

      Service簡介

      首先Nifi中的Controller Service 和我們MVC概念中的Controller Service不是一個概念,Nifi中的Controller Service更像是和Processor同級的一個概念,它和Processor在我個人的使用經驗來理解的話就是它是預制好的各種服務,可以被Processor引用或者支撐Processor,例如一個SQL讀取的Processor,它得需要JDBC的連接,才能訪問數據庫。這里Controller Service 就可以是一個JDBC的連接池服務。

      同理,Controller Service 也是支持擴展的,可以像自定義開發Processor一樣,根據自己的業務需求,進行自定義的Controller Service 開發。

      當我們使用某些依賴Service的組件(Processor)時,在配置中會出現選擇Service或者創建新的Service的情況,這里的Service即是Nifi的Controller Service,一旦創建新的,則會生成一個以Group為范圍的 “全局” Service對象,這時,再有依賴同類型Service的Processor時,可以直接選中:

       

       

       

       

       Controller Service的配置

      單獨查看Controller Service 可以從面板空白處,右鍵Configure來看,如下圖:

      這是一個JDBC的連接池Service,它包含的屬性有名稱、類型、簡介、啟用狀態、操作;從操作中可以看到配置該Service需要填寫基本的各類屬性;其中,Service是有啟停狀態的,如果想修改Service的屬性內容,必須先保證該Service是停用狀態,然后點擊配置標識,則進入配置頁面,它的配置和Processor的差不多,通過頁簽區別,共有三個頁簽:SETTING(基礎屬性)、PROPERTIES(使用屬性)、COMMENT(頁簽):

      SETTING 基礎屬性

       基礎屬性,包含左側的名稱,名稱可以進行更改,右側包含引用此Service的Processor 列表

      PROPERTIES 使用屬性

       

      核心的業務配置,此標簽頁的配置項根據不同的Service,配置內容不一致,具體的配置項以及使用,可以參考官方的文檔;這里的是JDBC的連接池,所以基本需要連接數據庫所需的URL、數據庫的賬號密碼、數據庫的驅動類名稱、驅動類的依賴 jar包路徑 ,這里不少Service可能都需要第三方的jar包依賴才可以使用,長期使用或生產環境下,建議將所有jar資源集中放在統一路徑下。

      COMMENT 頁簽

       一個提供Service使用說明的頁簽,可根據自己實際需求,補充使用Service的用法以及描述

      Service 的使用范圍

       在 nifi的基本使用 中的Group的使用介紹,Group同時也對Services起作用,如果我們在一個Nifi的最外層的平面上 新增Controller Service,那么這些Service的作用域是整個Nifi的任何位置,如果我們在某個Group內創建Controller Service, 那么這個Controller Service 僅在Group范圍內可以被引用,Nifi的這種機制也是方便Service的使用和維護

       

      全局參數配置

      類似于 數據庫連接池、Kafka、Redis等各種組件的連接池、客戶端Client的Service在實際的使用中會非常多,由此配置的Service也會非常多,于是就會產生很多次的反復配置URL、賬號這一系列重復的內容,由于Nifi的特性,這些Service又和組件(Processor)一樣,四散在各處,這就使得維護和運維管理變得很繁瑣,調試、調整、查看的時候,要不停的各個group來回跳轉、調整不同的Service的Configure;為應對此類問題,Nifi 提供了全局配置的機制來彌補。

       使用變量前:

      這里的 URL、Driver Class Name、Database User在實際生產環境中,可能都是固定的數據庫和固定的服務,幾乎不需要變得,可能只需要配置一遍就好,不需要每次創建Service都寫一遍;所以可以這里可以使用上下文變量(Parameter Context)

      首先,打開Parameter Context,創新一組新的變量:

       

       

       

       

       

      之后進入Service 的管控面板(空白處右鍵選擇Configure),先選擇變量組:

       

      再進入 CONTROLLER SERVICES 對Service的配置進行修改,將具體的RUL、Driver-name、user等參數,全部使用變量替換(變量使用‘#’符 )

       

      DBCPConnectionPool的使用樣例

      下面將使用Nifi 實現一個簡單的Demo,從Mysql數據庫中讀取部分數據,將數據進行篩選,然后將數據輸出;

      首先,使用ExecuteSQL組件,讀取Mysql中的數據,根據上文描述,創建一個DBCPConnectionPool 的Service,然后啟動 :

      添加 ExecuteSQL組件,配置相關內容,根據自定義編寫的SQL讀取數據庫內容:

      隨后添加 ConvertAvroToJSON 組件,這里從數據庫讀出的數據是不可讀的,為了方便查看調試、同時也是為了后續使用groovy處理數據,所以選擇轉換為JSON進行處理,實際使用可以根據自身情況選擇轉換器:

      添加 ExecuteGroovyScript 組件,使用groovy腳本對數據進行處理,groovy的腳本內容如下:

      groovy內容:

      import org.apache.commons.io.IOUtils;
      import java.nio.charset.StandardCharsets;
      import groovy.json.JsonBuilder;
      import groovy.json.JsonOutput;
      import groovy.json.JsonSlurper;
      import groovy.json.StringEscapeUtils;
      import java.util.*;
      
      
      
      def dataJson = getInputJSONData()
      if(null == dataJson){
          return;
      }
      def rss = []
      for(int i = 0; i < dataJson.size();i++){
          def tem = dataJson.get(i);
         //在這里可以對數據進行處理
          rss.add(tem.name);
      }
      
      // 輸出
      if(rss.size()>0){
          sendData(rss,REL_SUCCESS);
      }
      
      
      
      
      
      /**
       * 讀取輸入流
       * @author GCC
       ***/
      def getInputJSONData(){
          def flow = session.get()
          if(null == flow){
              log.error("the flow is null ...");
              return;
          }
          def dataJson = null;
          def jsonStr = "";
          session.read(flow,{
              inputStream ->
                  jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
          } as InputStreamCallback);
          try{
              dataJson = new JsonSlurper().parseText(jsonStr);
          }catch(Exception e){
              log.error("輸入流格式錯誤")
          }
          session.remove(flow);
          return dataJson;
      }
      /**
       *輸出數據至后續管道
       *@param result 輸出的數據
       *@param outStream 輸出的管道
       *@author GCC
       ***/
      void sendData(def result,def outStream){
          String successFlowFileStr =StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString());
          def newflow = session.create();
          newflow = session.write(newflow, {
              outputStream ->
                  outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8))
          } as OutputStreamCallback)
          session.transfer(newflow, outStream);
      }

      最后使用LogMessage組件作為接收數據,實際情況可以將數據轉為下一處理節點或存儲等等

       

       

      在ExcuseGroovyScript組件中使用Service

       在 ExcuseGroovyScript 組件內部使用groovy腳本處理數據時,可能需要再次讀取數據庫或者使用其他第三方數據來輔助處理,這時候,ExcuteGroovyScript組件支持可以引入Service,提供用戶編寫的groovy腳本內部使用Service;

      首先需要在ExcuteGroovyScript組件的PROPERTIES  配置中新增屬性:

      這里,添加屬性時,會讓用戶輸入用戶給該屬性的命名,如果是普通命名,這里的屬性僅僅作為靜態數據而已,但是如果使用關鍵字 ‘SQL.’ 或者 'CTL.'作為名稱前綴時,則能夠使用Service,后續的屬性值則會變成Service的選擇。

      在groovy的代碼中,則可以通過 SQL.mysql.{method}的方式,調用Service的方法:

      import org.apache.commons.io.IOUtils;
      import java.nio.charset.StandardCharsets;
      import groovy.json.JsonBuilder;
      import groovy.json.JsonOutput;
      import groovy.json.JsonSlurper;
      import groovy.json.StringEscapeUtils;
      import java.util.*;
      
      
      
      def dataJson = getInputJSONData()
      if(null == dataJson){
          return;
      }
      def rss = []
      for(int i = 0; i < dataJson.size();i++){
          def tem = dataJson.get(i);
          def mapdic = [:]
          //使用Service查詢數據庫
          SQL.mysql.eachRow("SELECT id,value FROM tb_dic_detail WHERE u_status = 1 "){
             row->
                 mapdic.put(row.id.toString(),row.value.toString());
      } rss.add(tem.name); }
      // 輸出 if(rss.size()>0){ sendData(rss,REL_SUCCESS); } /*****************************************************************公共函數*********************************************************************/ /** * 讀取輸入流 * @author GCC ***/ def getInputJSONData(){ def flow = session.get() if(null == flow){ log.error("the flow is null ..."); return; } def dataJson = null; def jsonStr = ""; session.read(flow,{ inputStream -> jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8) } as InputStreamCallback); try{ dataJson = new JsonSlurper().parseText(jsonStr); }catch(Exception e){ log.error("輸入流格式錯誤") } session.remove(flow); return dataJson; } /** *輸出數據至后續管道 *@param result 輸出的數據 *@param outStream 輸出的管道 *@author GCC ***/ void sendData(def result,def outStream){ String successFlowFileStr =StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString()); def newflow = session.create(); newflow = session.write(newflow, { outputStream -> outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback) session.transfer(newflow, outStream); }

       


       

      1、Nifi:基本認識

      2、Nifi:基礎用法及頁面常識

      3、Nifi:ExcuseXXXScript組件的使用(一)

      4、Nifi:ExcuseXXXScript組件的使用(二)

      5、Nifi:ExcuseXXXScript組件的使用(三)

      6、Nifi:自定義處理器的開發

      7、Nifi:Nifi的Controller Service

      posted @ 2023-07-07 15:28  糖拌西紅柿  閱讀(878)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 天堂…中文在线最新版在线| 成年男女免费视频网站| 成人无码视频| 亚洲一区二区日韩综合久久| 纯肉高h啪动漫| 国色天香成人一区二区| 人人做人人妻人人精| 人成午夜免费大片| 一区二区亚洲人妻精品| 久久精品国产久精国产果冻传媒| 国产日韩入口一区二区| 自拍亚洲一区欧美另类 | 国产午夜精品久久久久免费视| 99在线 | 亚洲| av亚洲在线一区二区| 日韩精品无码一区二区视频 | 高清自拍亚洲精品二区| 九龙城区| 熟女一区| 桃花岛亚洲成在人线AV| 97人洗澡人人澡人人爽人人模| 国产成人无码免费看片软件| 国产日韩精品中文字幕| 国产精品久久精品国产| 国产精品偷乱一区二区三区| 国产欧美日韩亚洲一区二区三区| 激情内射亚洲一区二区三区| аⅴ天堂国产最新版在线中文 | 成年女人片免费视频播放A| 汉源县| 麻豆蜜桃伦理一区二区三区| 精品乱码一区二区三四五区| 上饶县| 国产绿帽在线视频看| 中文字幕有码免费视频| 日韩高清国产中文字幕| 久久91精品牛牛| √天堂中文在线最新版| 国产午夜福利视频合集| 免费看黄色亚洲一区久久| 无码精品人妻一区二区三区中 |