<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [源碼分析]從"UDF不應有狀態(tài)" 切入來剖析Flink SQL代碼生成 (修訂版)

      [源碼分析]從"UDF不應有狀態(tài)" 切入來剖析Flink SQL代碼生成 (修訂版)

      0x00 摘要

      "Flink SQL UDF不應有狀態(tài)" 這個技術(shù)細節(jié)可能有些朋友已經(jīng)知道了。但是為什么不應該有狀態(tài)呢?這個恐怕大家就不甚清楚了。本文就帶你一起從這個問題點入手,看看Flink SQL究竟是怎么處理UDF,怎么生成對應的SQL代碼。

      0x01 概述結(jié)論

      先說結(jié)論,后續(xù)一步步給大家詳述問題過程。

      1. 問題結(jié)論

      結(jié)論是:Flink內(nèi)部對SQL生成了java代碼,但是這些java代碼針對SQL做了優(yōu)化,導致在某種情況下,可能 會對 "在SQL中本應只調(diào)用一次" 的UDF 重復調(diào)用

      • 我們在寫SQL時候,經(jīng)常會在SQL中只寫一次UDF,我們認為運行時候也應該只調(diào)用一次UDF。
      • 對于SQL,F(xiàn)link是內(nèi)部解析處理之后,把SQL語句轉(zhuǎn)化為Flink原生算子來處理。大家可以認為是把SQL翻譯成了java代碼再執(zhí)行,這些代碼針對 SQL做了優(yōu)化。
      • 對于UDF,F(xiàn)link也是內(nèi)部生成java代碼來處理,這些代碼也針對SQL做了優(yōu)化。
      • 在Flink內(nèi)部生成的這些代碼中,F(xiàn)link會在某些特定情況下,對 "在SQL中本應只調(diào)用一次" 的UDF 重復調(diào)用
      • Flink生成的內(nèi)部代碼,是把"投影運算"和"過濾條件"分別生成,然后拼接在一起。優(yōu)化后的"投影運算"和"過濾條件"分別調(diào)用了UDF,所以拼接之后就會有多個UDF調(diào)用。
      • 因為實際上編寫時候的一次UDF,優(yōu)化后可能調(diào)用了多次,所以UDF內(nèi)部就不應該有狀態(tài)信息。

      比如:

      1. myFrequency 這個字段是由 UDF_FRENQUENCY 這個UDF函數(shù) 在本步驟生成。
      
      SELECT word, UDF_FRENQUENCY(frequency) as myFrequency FROM TableWordCount
      
      2. 按說下面SQL語句就應該直接取出 myFrequency 即可。因為 myFrequency 已經(jīng)存在了。
      
      SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0
      
      但是因為Flink做了一些優(yōu)化,把 第一個SQL中 UDF_FRENQUENCY 的計算下推到了 第二個SQL。
      
      3. 優(yōu)化后實際就變成了類似這樣的SQL。
      
      SELECT word, UDF_FRENQUENCY(frequency) FROM tableFrequency WHERE UDF_FRENQUENCY(frequency) <> 0
      
      4. 所以UDF_FRENQUENCY就被執(zhí)行了兩次:在WHERE中執(zhí)行了一次,在SELECT中又執(zhí)行了一次。
      

      Flink針對UDF所生成的Java代碼 簡化轉(zhuǎn)義 版如下,能看出來調(diào)用了兩次:

        // 原始 SQL "SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0"
      
          java.lang.Long result$12 = UDF_FRENQUENCY(frequency); // 這次 UDF 調(diào)用對應 WHERE myFrequency <> 0
          
          if (result$12 != 0) { // 這里說明 myFrequency <> 0,于是可以進行 SELECT
            
            // 這里對應的是 SELECT myFrequency,注意的是,按我們一般的邏輯,應該直接復用result$12,但是這里又調(diào)用了 UDF,重新計算了一遍。所以 UDF 才不應該有狀態(tài)信息。
      	    java.lang.Long result$9 = UDF_FRENQUENCY(frequency);  
      
      	    long select;
            
      	    if (result$9 == null) {
      	      select = -1L;
      	    }
      	    else {
      	      select = result$9; // 這里最終 SELECT 了 myFrequency
      	    }
          }
      

      2. 問題流程

      實際上就是Flink生成SQL代碼的流程,其中涉及到幾個重要的節(jié)點舉例如下:

      關(guān)于具體SQL流程,請參見我之前的文章:[源碼分析] 帶你梳理 Flink SQL / Table API內(nèi)部執(zhí)行流程

      // NOTE : 執(zhí)行順序是從上至下, " -----> " 表示生成的實例類型
      * 
      *        +-----> "SELECT xxxxx WHERE UDF_FRENQUENCY(frequency) <> 0" // (SQL statement)
      *        |    
      *        |     
      *        +-----> LogicalFilter (RelNode) // Abstract Syntax Tree,未優(yōu)化的RelNode   
      *        |      
      *        |     
      *    FilterToCalcRule (RelOptRule) // Calcite優(yōu)化rule     
      *        | 
      *        |   
      *        +-----> LogicalCalc (RelNode)  // Optimized Logical Plan,邏輯執(zhí)行計劃
      *        |  
      *        |    
      *    DataSetCalcRule (RelOptRule) // Flink定制的優(yōu)化rule,轉(zhuǎn)化為物理執(zhí)行計劃
      *        |       
      *        |   
      *        +-----> DataSetCalc (FlinkRelNode) // Physical RelNode,物理執(zhí)行計劃
      *        |      
      *        |     
      *    DataSetCalc.translateToPlanInternal  // 作用是生成Flink算子  
      *        |     
      *        |     
      *        +-----> FlatMapRunner (Operator) // In Flink Task   
      *        |     
      *        |    
      

      這里的幾個關(guān)鍵點是:

      • "WHERE UDF_FRENQUENCY(frequency) <> 0" 這部分SQL對應Calcite的邏輯算子是 LogicalFilter
      • LogicalFilter被轉(zhuǎn)換為LogicalCalc,經(jīng)過思考我們可以知道,Filter的Condition條件是需要進行計算才能獲得的,所以需要轉(zhuǎn)換為Calc
      • DataSetCalc中會生成SQL對應的JAVA代碼,這個java類是:DataSetCalcRule extends RichFlatMapFunction。這點很有意思,Flink認為第二條SQL是一個Flatmap操作
      • 為什么UDF對應的第二條SQL是一個Flatmap操作。因為UDF的輸入實際是一個數(shù)據(jù)庫記錄Record,這很像集合;輸出的是數(shù)目不等的幾部分。這恰恰是Flatmap的思想所在

      關(guān)于FlatMap,請參見我之前的文章:[源碼分析] 從FlatMap用法到Flink的內(nèi)部實現(xiàn)

      我們后文中主要就是排查SQL生成流程中哪里出現(xiàn)了這個"UDF多次調(diào)用的問題點"

      0x02 UDX

      1. UDX (自定義函數(shù))

      Flink實時計算支持以下3類自定義函數(shù)

      UDX分類 描述
      UDF(User Defined Function) 用戶自定義標量值函數(shù)(User Defined Scalar Function)。其輸入與輸出是一對一的關(guān)系,即讀入一行數(shù)據(jù),寫出一條輸出值。
      UDAF(User Defined Aggregation Function) 自定義聚合函數(shù),其輸入與輸出是多對一的關(guān)系, 即將多條輸入記錄聚合成一條輸出值。可以與SQL中的GROUP BY語句一起使用。
      UDTF(User Defined Table-valued Function) 自定義表值函數(shù),調(diào)用一次函數(shù)輸出多行或多列數(shù)據(jù)。

      2. 自定義標量函數(shù) Scalar Functions (UDF)

      用戶定義的標量函數(shù)(UDF)將0個、1個或多個標量值映射到一個新的標量值。

      實現(xiàn)一個標量函數(shù)需要繼承ScalarFunction,并且實現(xiàn)一個或者多個evaluation方法。標量函數(shù)的行為就是通過evaluation方法來實現(xiàn)的。evaluation方法必須定義為public,命名為eval。evaluation方法的輸入?yún)?shù)類型和返回值類型決定著標量函數(shù)的輸入?yún)?shù)類型和返回值類型。

      另外 UDF 也有open方法和close方法可選。我們稍后會提到。

      3. 自定義聚合函數(shù)(UDAF)

      自定義聚合函數(shù)(UDAF)將多條記錄聚合成1條記錄。

      聚合函數(shù)需要繼承AggregateFunction。聚合函數(shù)工作方式如下:

      • 首先,需要一個accumulator,這個是保存聚合中間結(jié)果的數(shù)據(jù)結(jié)構(gòu)。調(diào)用AggregateFunction函數(shù)的createAccumulator()方法來創(chuàng)建一個空accumulator.
      • 隨后,每個輸入行都會調(diào)用accumulate()方法來更新accumulator。一旦所有的行被處理了,getValue()方法就會被調(diào)用,計算和返回最終的結(jié)果。

      createAccumulator、getValue 和 accumulate3個方法一起使用,就能設(shè)計出一個最基本的UDAF。但是實時計算一些特殊的場景需要您提供retract和merge兩個方法才能完成。

      4. 自定義表值函數(shù)(UDTF)

      自定義表值函數(shù)(UDTF)與自定義的標量函數(shù)類似,自定義的表值函數(shù)(UDTF)將0個、1個或多個標量值作為輸入?yún)?shù)(可以是變長參數(shù))。與標量函數(shù)不同,表值函數(shù)可以返回任意數(shù)量的行作為輸出,而不僅是1個值。返回的行可以由1個或多個列組成。

      為了自定義表函數(shù),需要繼承TableFunction,實現(xiàn)一個或者多個evaluation方法。表函數(shù)的行為定義在這些evaluation方法內(nèi)部,函數(shù)名為eval并且必須是public。

      UDTF可以通過多次調(diào)用collect()實現(xiàn)將1行的數(shù)據(jù)轉(zhuǎn)為多行返回。

      UDTF不僅可以做到1行轉(zhuǎn)多行,還可以1列轉(zhuǎn)多列。如果您需要UDTF返回多列,只需要將返回值聲明成Tuple或Row。

      5. RichFunction

      RichFunction是Flink提供的一個函數(shù)類的接口,所有Flink函數(shù)類都有其Rich版本。它與常規(guī)函數(shù)的不同在于,可以獲取運行環(huán)境的上下文,并擁有一些生命周期方法,所以可以實現(xiàn)更復雜的功能。

      這里專門提到RichFunction,是因為Flink是把UDF做為RichFunction的一部分來實現(xiàn),即UDF就是RichFunction的成員變量function。所以open, close這兩個函數(shù)就是在RichFunction的相關(guān)同名函數(shù)中被調(diào)用,而eval函數(shù)在RichFunction的業(yè)務函數(shù)中被調(diào)用,比如下文中的function.flatMap就是調(diào)用了 UDF.eval:

        override def flatMap(in: Row, out: Collector[Row]): Unit =
          function.flatMap(in, out) 
      

      沒有相關(guān)經(jīng)驗的同學應該可以深入了解RichFunction用法。

      0x03 實例代碼

      以下是我們的示例程序,后續(xù)就講解這個程序的生成代碼。

      1. UDF函數(shù)

      這里只實現(xiàn)了eval函數(shù),沒有實現(xiàn)open, close。

      import org.apache.flink.table.functions.ScalarFunction;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      public class myUdf extends ScalarFunction {
          private Long current = 0L;
          private static final Logger LOGGER = LoggerFactory.getLogger(myUdf.class);
          public Long eval(Long a) throws Exception {
              if(current == 0L) {
                  current = a;
              } else  {
                  current += 1;
              }
              LOGGER.error("The current is : " + current );
              return current;
          }
      }
      

      2. 測試代碼

      import org.apache.flink.api.scala._
      import org.apache.flink.table.api.scala._
      
      object TestUdf {
      
        def main(args: Array[String]): Unit = {
      
          // set up execution environment
          val env = ExecutionEnvironment.getExecutionEnvironment
          val tEnv = BatchTableEnvironment.create(env)
      
          val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
      
          tEnv.registerFunction("UDF_FRENQUENCY", new myUdf())
      
          // register the DataSet as a view "WordCount"
          tEnv.createTemporaryView("TableWordCount", input, 'word, 'frequency)
      
          val tableFrequency = tEnv.sqlQuery("SELECT word, UDF_FRENQUENCY(frequency) as myFrequency FROM TableWordCount")
          tEnv.registerTable("TableFrequency", tableFrequency)
      
          // run a SQL query on the Table and retrieve the result as a new Table
          val table = tEnv.sqlQuery("SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0")
      
          table.toDataSet[WC].print()
        }
      
        case class WC(word: String, frequency: Long)
      }
      

      3. 輸出結(jié)果

      // 輸出如下,能看到本來應該是調(diào)用三次,結(jié)果現(xiàn)在調(diào)用了六次
      
      11:15:05,409 ERROR mytestpackage.myUdf                - The current is : 1
      11:15:05,409 ERROR mytestpackage.myUdf                - The current is : 2
      11:15:05,425 ERROR mytestpackage.myUdf                - The current is : 3
      11:15:05,425 ERROR mytestpackage.myUdf                - The current is : 4
      11:15:05,426 ERROR mytestpackage.myUdf                - The current is : 5
      11:15:05,426 ERROR mytestpackage.myUdf                - The current is : 6
      

      1. 注冊UDF

      實例中,我們使用了registerFunction函數(shù),將UDF注冊到了TableEnvironment之中。

          tEnv.registerFunction("UDF_FRENQUENCY", new myUdf())
      

      TableEnvironment

      TableEnvironment 是Table API和SQL集成的核心概念,它主要負責:

      • 在內(nèi)部目錄Catalog中注冊一個Table,TableEnvironment有一個在內(nèi)部通過表名組織起來的表目錄,Table API或者SQL查詢可以訪問注冊在目錄中的表,并通過名稱來引用它們。
      • 注冊一個外部目錄Catalog
      • 執(zhí)行SQL查詢
      • 注冊一個用戶自定義函數(shù)(標量、表及聚合)
      • 將DataStream或者DataSet轉(zhuǎn)換成Table
      • 持有ExecutionEnvironment或者StreamExecutionEnvironment的引用

      FunctionCatalog

      在Flink中,Catalog是目錄概念,即所有對數(shù)據(jù)庫和表的元數(shù)據(jù)信息都存放再Flink CataLog內(nèi)部目錄結(jié)構(gòu)中,其存放了flink內(nèi)部所有與Table相關(guān)的元數(shù)據(jù)信息,包括表結(jié)構(gòu)信息/數(shù)據(jù)源信息等。

      所有UDF都是注冊在TableEnvImpl.functionCatalog 這個成員變量之中。這是專門存儲 "Table API/SQL函數(shù)定義" 的函數(shù)目錄 (Simple function catalog)。

      FunctionCatalog類具有如下兩個成員變量,都是LinkedHashMap。

      // FunctionCatalog,Table API/SQL function catalog
      public class FunctionCatalog implements FunctionLookup {
      	private final Map<String, FunctionDefinition> tempSystemFunctions = new LinkedHashMap<>();
      	private final Map<ObjectIdentifier, FunctionDefinition> tempCatalogFunctions = new LinkedHashMap<>();
      }
      

      tempCatalogFunctions:對應著SQL語句中的 "CREATE FUNCTION "功能,即Function DDL語法。其主要應用場景如下:

      • 從classpath加載UDF
      CREATE TEMPORARY FUNCTION catalog1.db1.func1 AS ‘com.xxx.udf.func1UDF’ LANGUAGE ’JVM’
      DROP FUNCTION catalog1.db1.geofence
      
      • 從遠程資源加載UDF
      CREATE FUNCTION catalog1.db1.func2 AS ‘com.xxx.udf.func2UDF’ LANGUAGE JVM USING ‘http://artifactory.uber.internal:4587/artifactory/libs-snapshot-local/com/xxx/xxx/xxx-udf/1.0.1-SNAPSHOT/xxx-udf-1.0.1-20180502.011548-12.jar’
      
      • 從遠程資源加載python UDF
      CREATE FUNCTION catalog1.db1.func3 AS ‘com.xxx.udf.func3UDF’ LANGUAGE ‘PYTHON’ USING ‘http://external.resources/flink-udf.py’
      

      tempSystemFunctions :存儲UDX函數(shù),就是本文所要闡述的內(nèi)容。

      經(jīng)過本階段之后,myUdf 這個UDX函數(shù),就做為 "UDF_FRENQUENCY" 注冊到了系統(tǒng)中,可以在后續(xù)的SQL中進行調(diào)用操作

      2. LogicalFilter

      此時,F(xiàn)link已經(jīng)完成了如下操作:

      • SQL 解析階段,生成AST(抽象語法樹)(SQL–>SqlNode)
      • SqlNode 驗證(SqlNode–>SqlNode)
      • 語義分析,生成邏輯計劃(Logical Plan)(SqlNode–>RelNode/RexNode)

      Flink將RelNode串成了一個鏈,具體是由類實例的input完成這個串聯(lián)任務,即input指向本實例的上游輸入。

      LogicalFilter的 input 是 LogicalProject,LogicalProject 的 input 是FlinkLogicalDataSetScan。而FlinkLogicalDataSetScan 的table中就可以知道具體輸入表的信息。

      這個RelNode鏈具體如下。

      == Abstract Syntax Tree ==
      LogicalProject(word=[$0], myFrequency=[$1])
        LogicalFilter(condition=[<>($1, 0)])
          LogicalProject(word=[$0], myFrequency=[UDF_FRENQUENCY($1)])
            FlinkLogicalDataSetScan(ref=[1976870927], fields=[word, frequency])
        
      每一部分都是由 input 指向完成的。
      

      這里的重點是 " myFrequency <> 0" 被轉(zhuǎn)換為 LogicalFilter。這倒是容易理解,因為 WHERE 子句實際就是用來過濾的,所以轉(zhuǎn)換為 LogicalFilter合情合理。

      另外需要注意的是:在構(gòu)建RelNode鏈的時候 ,F(xiàn)link已經(jīng)從TableEnvImpl.functionCatalog 這個成員變量之中提取到了之前注冊的myUdf 這個UDF函數(shù)實例。當需要獲取UDF實例時候,calcite會在 SqlOperatorTable table 中尋找UDF,進而就調(diào)用到了FunctionCatalog.lookupFunction這里,從LinkedHashMap中取得實例。

      具體是SqlToRelConverter函數(shù)中會將SQL語句轉(zhuǎn)換為RelNode,在SqlToRelConverter (org.apache.calcite.sql2rel)完成,其打印內(nèi)容摘要如下:

      filter = {LogicalFilter@4814} "LogicalFilter#2"
       variablesSet = {RegularImmutableSet@4772}  size = 0
       condition = {RexCall@4771} "<>($1, 0)"
       input = {LogicalProject@4770} "LogicalProject#1"
        exps = {RegularImmutableList@4821}  size = 2
        input = {FlinkLogicalDataSetScan@4822} "FlinkLogicalDataSetScan#0"
         cluster = {RelOptCluster@4815} 
         catalog = {CatalogReader@4826} 
         dataSet = {DataSource@4827} 
         fieldIdxs = {int[2]@4828} 
         schema = {RelRecordType@4829} "RecordType(VARCHAR(65536) word, BIGINT frequency)"
         table = {RelOptTableImpl@4830} 
          schema = {CatalogReader@4826} 
          rowType = {RelRecordType@4829} "RecordType(VARCHAR(65536) word, BIGINT frequency)"
      
      展開查看調(diào)用棧
      
      create:107, LogicalFilter (org.apache.calcite.rel.logical)
      createFilter:333, RelFactories$FilterFactoryImpl (org.apache.calcite.rel.core)
      convertWhere:993, SqlToRelConverter (org.apache.calcite.sql2rel)
      convertSelectImpl:649, SqlToRelConverter (org.apache.calcite.sql2rel)
      convertSelect:627, SqlToRelConverter (org.apache.calcite.sql2rel)
      convertQueryRecursive:3181, SqlToRelConverter (org.apache.calcite.sql2rel)
      convertQuery:563, SqlToRelConverter (org.apache.calcite.sql2rel)
      rel:150, FlinkPlannerImpl (org.apache.flink.table.calcite)
      rel:135, FlinkPlannerImpl (org.apache.flink.table.calcite)
      toQueryOperation:490, SqlToOperationConverter (org.apache.flink.table.sqlexec)
      convertSqlQuery:315, SqlToOperationConverter (org.apache.flink.table.sqlexec)
      convert:155, SqlToOperationConverter (org.apache.flink.table.sqlexec)
      parse:66, ParserImpl (org.apache.flink.table.planner)
      sqlQuery:457, TableEnvImpl (org.apache.flink.table.api.internal)
      main:55, TestUdf$ (mytestpackage)
      main:-1, TestUdf (mytestpackage)
      

      3. FilterToCalcRule

      下面是優(yōu)化部分。優(yōu)化規(guī)則分為兩類,一類是Calcite提供的內(nèi)置優(yōu)化規(guī)則(如條件下推,剪枝等),另一類是是將Logical Node轉(zhuǎn)變成 Flink Node 的規(guī)則。

      這里Flink發(fā)現(xiàn)了FilterToCalcRule 這個rule適合對Filter進行切換。

      我們思考下可知,Filter的Condition條件是需要進行計算才能獲得的,所以需要轉(zhuǎn)換為Calc

      具體源碼在 VolcanoPlanner.findBestExp (org.apache.calcite.plan.volcano)

      call = {VolcanoRuleMatch@5576} "rule [FilterToCalcRule] rels [rel#35:LogicalFilter.NONE(input=RelSubset#34,condition=<>($1, 0))]"
       targetSet = {RelSet@5581} 
       targetSubset = null
       digest = "rule [FilterToCalcRule] rels [rel#35:LogicalFilter.NONE(input=RelSubset#34,condition=<>($1, 0))]"
       cachedImportance = 0.891
       volcanoPlanner = {VolcanoPlanner@5526} 
       generatedRelList = null
       id = 45
       operand0 = {RelOptRuleOperand@5579} 
       nodeInputs = {RegularImmutableBiMap@5530}  size = 0
       rule = {FilterToCalcRule@5575} "FilterToCalcRule"
       rels = {RelNode[1]@5582} 
       planner = {VolcanoPlanner@5526} 
       parents = null
      
      展開查看調(diào)用棧
      
      onMatch:65, FilterToCalcRule (org.apache.calcite.rel.rules)
      onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
      findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
      run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
      runVolcanoPlanner:280, Optimizer (org.apache.flink.table.plan)
      optimizeLogicalPlan:199, Optimizer (org.apache.flink.table.plan)
      optimize:56, BatchOptimizer (org.apache.flink.table.plan)
      translate:280, BatchTableEnvImpl (org.apache.flink.table.api.internal)
      toDataSet:69, BatchTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toDataSet:53, TableConversions (org.apache.flink.table.api.scala)
      main:57, TestUdf$ (mytestpackage)
      main:-1, TestUdf (mytestpackage)
      

      4. LogicalCalc

      因為上述的FilterToCalcRule,所以生成了 LogicalCalc。我們也可以看到這里就是包含了UDF_FRENQUENCY

      calc = {LogicalCalc@5632} "LogicalCalc#60"
       program = {RexProgram@5631} "(expr#0..1=[{inputs}], expr#2=[UDF_FRENQUENCY($t1)], expr#3=[0:BIGINT], expr#4=[<>($t2, $t3)], proj#0..1=[{exprs}], $condition=[$t4])"
       input = {RelSubset@5605} "rel#32:Subset#0.LOGICAL"
       desc = "LogicalCalc#60"
       rowType = {RelRecordType@5629} "RecordType(VARCHAR(65536) word, BIGINT frequency)"
       digest = "LogicalCalc#60"
       cluster = {RelOptCluster@5596} 
       id = 60
       traitSet = {RelTraitSet@5597}  size = 1
      

      5. DataSetCalc

      經(jīng)過轉(zhuǎn)換,最后得到了physical RelNode,即物理 RelNode 執(zhí)行計劃 DataSetCalc。

      == Optimized Logical Plan ==
      DataSetCalc(select=[word, UDF_FRENQUENCY(frequency) AS myFrequency], where=[<>(UDF_FRENQUENCY(frequency), 0:BIGINT)])
        DataSetScan(ref=[1976870927], fields=[word, frequency])
      

      具體源碼在 VolcanoPlanner.findBestExp (org.apache.calcite.plan.volcano)。

      // 這里給出了執(zhí)行函數(shù),運行內(nèi)容和調(diào)用棧
        
      ConverterRule.onMatch(RelOptRuleCall call) {
              RelNode rel = call.rel(0);
              if (rel.getTraitSet().contains(this.inTrait)) {
                  RelNode converted = this.convert(rel);
                  if (converted != null) {
                      call.transformTo(converted);
                  }
              }
      }
      
      // 轉(zhuǎn)換后的 DataSetCalc 內(nèi)容如下
      
      converted = {DataSetCalc@5560} "Calc(where: (<>(UDF_FRENQUENCY(frequency), 0:BIGINT)), select: (word, UDF_FRENQUENCY(frequency) AS myFrequency))"
       cluster = {RelOptCluster@5562} 
       rowRelDataType = {RelRecordType@5565} "RecordType(VARCHAR(65536) word, BIGINT myFrequency)"
       calcProgram = {RexProgram@5566} "(expr#0..1=[{inputs}], expr#2=[UDF_FRENQUENCY($t1)], expr#3=[0:BIGINT], expr#4=[<>($t2, $t3)], word=[$t0], myFrequency=[$t2], $condition=[$t4])"
       ruleDescription = "DataSetCalcRule"
       program = {RexProgram@5566} "(expr#0..1=[{inputs}], expr#2=[UDF_FRENQUENCY($t1)], expr#3=[0:BIGINT], expr#4=[<>($t2, $t3)], word=[$t0], myFrequency=[$t2], $condition=[$t4])"
       input = {RelSubset@5564} "rel#71:Subset#5.DATASET"
       desc = "DataSetCalc#72"
       rowType = {RelRecordType@5565} "RecordType(VARCHAR(65536) word, BIGINT myFrequency)"
       digest = "DataSetCalc#72"
       AbstractRelNode.cluster = {RelOptCluster@5562} 
       id = 72
       traitSet = {RelTraitSet@5563}  size = 1
      
      展開查看調(diào)用棧
      
      init:52, DataSetCalc (org.apache.flink.table.plan.nodes.dataset)
      convert:40, DataSetCalcRule (org.apache.flink.table.plan.rules.dataSet)
      onMatch:144, ConverterRule (org.apache.calcite.rel.convert)
      onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
      findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
      run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
      runVolcanoPlanner:280, Optimizer (org.apache.flink.table.plan)
      optimizePhysicalPlan:209, Optimizer (org.apache.flink.table.plan)
      optimize:57, BatchOptimizer (org.apache.flink.table.plan)
      translate:280, BatchTableEnvImpl (org.apache.flink.table.api.internal)
      toDataSet:69, BatchTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toDataSet:53, TableConversions (org.apache.flink.table.api.scala)
      main:57, TestUdf$ (mytestpackage)
      main:-1, TestUdf (mytestpackage)
      

      6. generateFunction (問題點所在)

      在DataSetCalc中,會最后生成UDF對應的JAVA代碼。

      class DataSetCalc {
        
        override def translateToPlan(
            tableEnv: BatchTableEnvImpl,
            queryConfig: BatchQueryConfig): DataSet[Row] = {
      
          ......
          
          // 這里生成了UDF對應的JAVA代碼
          val genFunction = generateFunction(
            generator,
            ruleDescription,
            new RowSchema(getRowType),
            projection,
            condition,
            config,
            classOf[FlatMapFunction[Row, Row]])
      
          // 這里生成了FlatMapRunner
          val runner = new FlatMapRunner(genFunction.name, genFunction.code, returnType)
      
          inputDS.flatMap(runner).name(calcOpName(calcProgram, getExpressionString))
        }  
      }
      
      展開查看調(diào)用棧
      
      translateToPlan:90, DataSetCalc (org.apache.flink.table.plan.nodes.dataset)
      translate:306, BatchTableEnvImpl (org.apache.flink.table.api.internal)
      translate:281, BatchTableEnvImpl (org.apache.flink.table.api.internal)
      toDataSet:69, BatchTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toDataSet:53, TableConversions (org.apache.flink.table.api.scala)
      main:57, TestUdf$ (mytestpackage)
      main:-1, TestUdf (mytestpackage)
      

      真正生成代碼的位置如下,能看出來生成代碼是FlatMapFunction。而本文的問題點就出現(xiàn)在這里

      具體原因從下面代碼的注釋中能夠看出:針對本示例代碼,最后是生成了

      • 投射內(nèi)容,就是 SELECT。filterCondition實際上已經(jīng)生成包含了調(diào)用UDF的代碼
      • 過濾條件,就是 WHERE。projection實際上已經(jīng)生成包含了調(diào)用UDF的代碼
      • 生成類的部分代碼,這里對應的是UDF的業(yè)務內(nèi)容,這里就是簡單的把“投射內(nèi)容”和“過濾條件”拼接在一起,并沒有做優(yōu)化,所以就形成了兩個UDF調(diào)用。
      // 下面能看出,針對不同的SQL子句,F(xiàn)link會進行不同的轉(zhuǎn)化
      
      trait CommonCalc {
      
        private[flink] def generateFunction[T <: Function](
            generator: FunctionCodeGenerator,
            ruleDescription: String,
            returnSchema: RowSchema,
            calcProjection: Seq[RexNode],
            calcCondition: Option[RexNode],
            config: TableConfig,
            functionClass: Class[T]):
          GeneratedFunction[T, Row] = {
      
          // 生成投射內(nèi)容,就是 SELECT。filterCondition實際上已經(jīng)生成包含了調(diào)用UDF的代碼,下面會給出其內(nèi)容
          val projection = generator.generateResultExpression(
            returnSchema.typeInfo,
            returnSchema.fieldNames,
            calcProjection)
      
          // only projection
          val body = if (calcCondition.isEmpty) {
            s"""
              |${projection.code}
              |${generator.collectorTerm}.collect(${projection.resultTerm});
              |""".stripMargin
          }
          else {
            // 生成過濾條件,就是 WHERE。filterCondition實際上已經(jīng)生成包含了調(diào)用UDF的代碼,下面會給出其內(nèi)容
            val filterCondition = generator.generateExpression(calcCondition.get)
              
            // only filter
            if (projection == null) {
              s"""
                |${filterCondition.code}
                |if (${filterCondition.resultTerm}) {
                |  ${generator.collectorTerm}.collect(${generator.input1Term});
                |}
                |""".stripMargin
            }
            // both filter and projection
            else {
              // 本例中,會進入到這里。把 filterCondition 和 projection 代碼拼接起來。這下子就有了兩個 UDF 的調(diào)用。
              s"""
                |${filterCondition.code}
                |if (${filterCondition.resultTerm}) {
                |  ${projection.code}
                |  ${generator.collectorTerm}.collect(${projection.resultTerm});
                |}
                |""".stripMargin
            }
          }
      
          // body 是filterCondition 和 projection 代碼的拼接,分別都有 UDF 的調(diào)用,現(xiàn)在就有了兩個UDF調(diào)用了,也就是我們問題所在。
          generator.generateFunction(
            ruleDescription,
            functionClass,
            body,
            returnSchema.typeInfo)
        }
      }
      
      // 此函數(shù)輸入中,calcCondition就是我們SQL的過濾條件
      
      calcCondition = {Some@5663} "Some(<>(UDF_FRENQUENCY($1), 0))"
      
      // 此函數(shù)輸入中,calcProjection就是我們SQL的投影運算條件
        
      calcProjection = {ArrayBuffer@5662} "ArrayBuffer" size = 2
       0 = {RexInputRef@7344} "$0"
       1 = {RexCall@7345} "UDF_FRENQUENCY($1)"
        
      // 生成過濾條件,就是 WHERE 對應的代碼。filterCondition實際上已經(jīng)生成包含了調(diào)用UDF的代碼
        
      filterCondition = {GeneratedExpression@5749} "GeneratedExpression(result$16,isNull$17,\n\n\n\njava.lang.Long result$12 = function_spendreport$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(\n  isNull$8 ? null : (java.lang.Long) result$7);\n\n\nboolean isNull$14 = result$12 == null;\nlong result$13;\nif (isNull$14) {\n  result$13 = -1L;\n}\nelse {\n  result$13 = result$12;\n}\n\n\n\nlong result$15 = 0L;\n\nboolean isNull$17 = isNull$14 || false;\nboolean result$16;\nif (isNull$17) {\n  result$16 = false;\n}\nelse {\n  result$16 = result$13 != result$15;\n}\n,Boolean,false)"
          
      // 生成投影運算,就是 SELECT 對應的代碼。projection也包含了調(diào)用UDF的代碼  
        
      projection = {GeneratedExpression@5738} "GeneratedExpression(out,false,\n\nif (isNull$6) {\n  out.setField(0, null);\n}\nelse {\n  out.setField(0, result$5);\n}\n\n\n\n\n\njava.lang.Long result$9 = function_spendreport$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(\n  isNull$8 ? null : (java.lang.Long) result$7);\n\n\nboolean isNull$11 = result$9 == null;\nlong result$10;\nif (isNull$11) {\n  result$10 = -1L;\n}\nelse {\n  result$10 = result$9;\n}\n\n\nif (isNull$11) {\n  out.setField(1, null);\n}\nelse {\n  out.setField(1, result$10);\n}\n,Row(word: String, myFrequency: Long),false)"
        
      // 具體這個類其實是 DataSetCalcRule extends RichFlatMapFunction 
      name = "DataSetCalcRule"
        
      // 生成的類  
      clazz = {Class@5773} "interface org.apache.flink.api.common.functions.FlatMapFunction"
        
      // 生成類的部分代碼,這里對應的是UDF的業(yè)務內(nèi)容
      bodyCode = "\n\n\n\n\njava.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(\n  isNull$8 ? null : (java.lang.Long) result$7);\n\n\nboolean isNull$14 = result$12 == null;\nlong result$13;\nif (isNull$14) {\n  result$13 = -1L;\n}\nelse {\n  result$13 = result$12;\n}\n\n\n\nlong result$15 = 0L;\n\nboolean isNull$17 = isNull$14 || false;\nboolean result$16;\nif (isNull$17) {\n  result$16 = false;\n}\nelse {\n  result$16 = result$13 != result$15;\n}\n\nif (result$16) {\n  \n\nif (isNull$6) {\n  out.setField(0, null);\n}\nelse {\n  out.setField(0, result$5);\n}\n\n\n\n\n\njava.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(\n  isNull$8 ? null : (java.lang.Long) result$7);\n\n\nboolean isNull$11 = result$9 == null;\nlong result$10;\nif (isNull$11) {\n  result$10 = -1L;\n}\nelse {\n  result$10 = result$9;\n}\n\n\nif (isNull$11) {\n  out.setField(1, null);\n}\nelse {\n  out.setField(1, result$10);\n}\n\n  c.collect(out);\n}\n"
      
      展開查看調(diào)用棧
      
      generateFunction:94, FunctionCodeGenerator (org.apache.flink.table.codegen)
      generateFunction:79, CommonCalc$class (org.apache.flink.table.plan.nodes)
      generateFunction:45, DataSetCalc (org.apache.flink.table.plan.nodes.dataset)
      translateToPlan:105, DataSetCalc (org.apache.flink.table.plan.nodes.dataset)
      translate:306, BatchTableEnvImpl (org.apache.flink.table.api.internal)
      translate:281, BatchTableEnvImpl (org.apache.flink.table.api.internal)
      toDataSet:69, BatchTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toDataSet:53, TableConversions (org.apache.flink.table.api.scala)
      main:57, TestUdf$ (mytestpackage)
      main:-1, TestUdf (mytestpackage)
      

      7. FlatMapRunner

      最后還要重點說明下Flink對于SQL代碼最后的轉(zhuǎn)換包裝。

      前面提到了,F(xiàn)link把UDF做為RichFunction的一部分來實現(xiàn)。事實上,F(xiàn)link是把SQL整條語句轉(zhuǎn)化為一個RichFunction。示例中的兩條SQL語句,分別轉(zhuǎn)換為 RichMapFunction 和 RichFlatMapFunction。具體從下面物理執(zhí)行計劃中可以看出。

      == Physical Execution Plan ==
      Stage 3 : Data Source
      	content : collect elements with CollectionInputFormat
      	Partitioning : RANDOM_PARTITIONED
      
      	Stage 2 : Map
      		content : from: (word, frequency)
      		ship_strategy : Forward
      		exchange_mode : PIPELINED
      		driver_strategy : Map
      		Partitioning : RANDOM_PARTITIONED
      
      		Stage 1 : FlatMap
      			content : where: (<>(UDF_FRENQUENCY(frequency), 0:BIGINT)), select: (word, UDF_FRENQUENCY(frequency) AS myFrequency)
      			ship_strategy : Forward
      			exchange_mode : PIPELINED
      			driver_strategy : FlatMap
      			Partitioning : RANDOM_PARTITIONED
      
      			Stage 0 : Data Sink
      				content : org.apache.flink.api.java.io.DiscardingOutputFormat
      				ship_strategy : Forward
      				exchange_mode : PIPELINED
      				Partitioning : RANDOM_PARTITIONED
      

      我們在org.apache.flink.table.runtime目錄下,可以看到Flink針對每一種 physical RelNode,都定義了一種RichFunction,摘錄如下:

      CRowCorrelateProcessRunner.scala        FlatMapRunner.scala
      CRowMapRunner.scala                     MapJoinLeftRunner.scala
      CRowOutputProcessRunner.scala           MapJoinRightRunner.scala
      CRowProcessRunner.scala                 MapRunner.scala
      CorrelateFlatMapRunner.scala            MapSideJoinRunner.scala
      FlatJoinRunner.scala
      

      實例中第二條SQL語句其類別就是 DataSetCalcRule extends RichFlatMapFunction。從定義能夠看出來,F(xiàn)latMapRunner繼承了RichFlatMapFunction,說明 Flink認為本條SQL就是一個Flatmap操作

      package org.apache.flink.table.runtime
      
      class FlatMapRunner(
          name: String,
          code: String,
          @transient var returnType: TypeInformation[Row])
        extends RichFlatMapFunction[Row, Row] ... {
      
        private var function: FlatMapFunction[Row, Row] = _
      
        ...
      
        override def flatMap(in: Row, out: Collector[Row]): Unit =
          function.flatMap(in, out)
      
        ...
      }
      

      0x05 UDF生成的代碼

      1. 縮減版

      這里是生成的代碼縮減版,能看出具體問題點,myUdf函數(shù)被執(zhí)行了兩次。

      function_mytestpackage\(myUdf\)c45b0e23278f15e8f7d075abac9a121b 這個就是 myUdf 轉(zhuǎn)換之后的函數(shù)。

        // 原始 SQL "SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0"
       
          java.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
            isNull$8 ? null : (java.lang.Long) result$7); // 這次 UDF 調(diào)用對應 WHERE myFrequency <> 0
      
          boolean isNull$14 = result$12 == null; 
          boolean isNull$17 = isNull$14 || false;
          boolean result$16;
          if (isNull$17) {
            result$16 = false;
          }
          else {
            result$16 = result$13 != result$15;
          }
          
          if (result$16) { // 這里說明 myFrequency <> 0,所以可以進入
      	    java.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
      	      isNull$8 ? null : (java.lang.Long) result$7); // 這里對應的是 SELECT myFrequency,注意的是,這里又調(diào)用了 UDF,重新計算了一遍,所以 UDF 才不應該有狀態(tài)信息。 
      	    boolean isNull$11 = result$9 == null;
      	    long result$10;
      	    if (isNull$11) {
      	      result$10 = -1L;
      	    }
      	    else {
      	      result$10 = result$9; // 這里才進行SELECT myFrequency,但是這時候 UDF 已經(jīng)被計算兩次了
      	    }
          }
      

      2. 完整版

      以下是生成的代碼,因為是自動生成,所以看起來會有點費勁,不過好在已經(jīng)是最后一步了。

      public class DataSetCalcRule$18 extends org.apache.flink.api.common.functions.RichFlatMapFunction {
      
        final mytestpackage.myUdf function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b;
      
        final org.apache.flink.types.Row out =
            new org.apache.flink.types.Row(2);
        
        private org.apache.flink.types.Row in1;
      
        public DataSetCalcRule$18() throws Exception {
          
          function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b = (mytestpackage.myUdf)
          org.apache.flink.table.utils.EncodingUtils.decodeStringToObject(
            "rO0ABXNyABFzcGVuZHJlcG9ydC5teVVkZmGYnDRF7Hj4AgABTAAHY3VycmVudHQAEExqYXZhL2xhbmcvTG9uZzt4cgAvb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuU2NhbGFyRnVuY3Rpb25uLPkGQbqbDAIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9u14hb_NiViUACAAB4cHNyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHAAAAAAAAAAAA",
            org.apache.flink.table.functions.UserDefinedFunction.class); 
        }
      
        @Override
        public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
          function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
        }
      
        @Override
        public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws Exception {
          in1 = (org.apache.flink.types.Row) _in1;
          
          boolean isNull$6 = (java.lang.String) in1.getField(0) == null;
          java.lang.String result$5;
          if (isNull$6) {
            result$5 = "";
          }
          else {
            result$5 = (java.lang.String) (java.lang.String) in1.getField(0);
          }
          
          boolean isNull$8 = (java.lang.Long) in1.getField(1) == null;
          long result$7;
          if (isNull$8) {
            result$7 = -1L;
          }
          else {
            result$7 = (java.lang.Long) in1.getField(1);
          }
      
          java.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
            isNull$8 ? null : (java.lang.Long) result$7);
      
          boolean isNull$14 = result$12 == null;
          long result$13;
          if (isNull$14) {
            result$13 = -1L;
          }
          else {
            result$13 = result$12;
          }
      
          long result$15 = 0L;
          
          boolean isNull$17 = isNull$14 || false;
          boolean result$16;
          if (isNull$17) {
            result$16 = false;
          }
          else {
            result$16 = result$13 != result$15;
          }
          
          if (result$16) {
          
              if (isNull$6) {
                out.setField(0, null);
              }
              else {
                out.setField(0, result$5);
              }
      
              java.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
                isNull$8 ? null : (java.lang.Long) result$7);
      
              boolean isNull$11 = result$9 == null;
              long result$10;
              if (isNull$11) {
                result$10 = -1L;
              }
              else {
                result$10 = result$9;
              }
      
              if (isNull$11) {
                out.setField(1, null);
              }
              else {
                out.setField(1, result$10);
              }
      
                c.collect(out);
              }
        }
      
        @Override
        public void close() throws Exception {  
          function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.close();
        }
      }
      

      0x06 總結(jié)

      至此,我們把Flink SQL如何生成JAVA代碼的流程大致走了一遍。

      Flink生成的內(nèi)部代碼,是把"投影運算"和"過濾條件"分別生成,然后拼接在一起

      即使原始SQL中只有一次UDF調(diào)用,但是如果SELECT和WHERE都間接用到了UDF,那么最終"投影運算"和"過濾條件"就會分別調(diào)用了UDF,所以拼接之后就會有多個UDF調(diào)用。

      這就是 "UDF不應該有內(nèi)部歷史狀態(tài)" 的最終原因。我們在實際開發(fā)過程中一定要注意這個問題。

      0x07 參考

      UDX概述 https://help.aliyun.com/document_detail/69463.html

      posted @ 2020-04-29 23:01  羅西的思考  閱讀(2210)  評論(2)    收藏  舉報
      主站蜘蛛池模板: 成人3D动漫一区二区三区| 猫咪AV成人永久网站在线观看| 亚洲午夜伦费影视在线观看| 国产天美传媒性色av| 国产精品一区二区久久毛片| 久久这里都是精品二| 成在人线av无码免费看网站直播| 欧美一级高清片久久99| 欧美乱妇高清无乱码免费| 免费无码久久成人网站入口| 国产99视频精品免费专区| 成人亚洲性情网站www在线观看| 欧美日本一区二区视频在线观看| 亚洲一二三四区中文字幕| 亚洲区精品区日韩区综合区| 亚洲熟妇自偷自拍另亚洲| 色综合久久久久综合体桃花网| 日韩a∨精品日韩在线观看 | 在线看av一区二区三区| 国内精品亚洲成av人片| 青草视频在线观看视频| 亚洲一二三区精品与老人| 一区二区三区不卡国产| 无码免费大香伊蕉在人线国产| 女同在线观看亚洲国产精品| 性做久久久久久久久| 无码人妻久久一区二区三区app| 遵义市| 国产精品午夜福利免费看| 彰化市| 二区中文字幕在线观看| 久久青青草原亚洲AV无码麻豆| AV老司机AV天堂| 麻豆av一区二区三区| 一边吃奶一边摸做爽视频| 精品国产亚洲午夜精品av| 国产第一页浮力影院入口| 丁香五月激情综合色婷婷| 亚洲天堂男人影院| 国产自在自线午夜精品| 国产精品无遮挡又爽又黄|