<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [源碼分析] 帶你梳理 Flink SQL / Table API內部執行流程

      [源碼分析] 帶你梳理 Flink SQL / Table API內部執行流程

      0x00 摘要

      本文將簡述Flink SQL / Table API的內部實現,為大家把 "從SQL語句到具體執行" 這個流程串起來。并且盡量多提供調用棧,這樣大家在遇到問題時就知道應該從什么地方設置斷點,對整體架構理解也能更加深入。

      SQL流程中涉及到幾個重要的節點舉例如下:

      // NOTE : 執行順序是從上至下, " -----> " 表示生成的實例類型
      * 
      *        +-----> "left outer JOIN" (SQL statement)
      *        |   
      *        |     
      *     SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),作用是SQL–>SqlNode      
      *        |   
      *        |      
      *        +-----> SqlJoin (SqlNode)
      *        |   
      *        |     
      *     SqlToRelConverter.convertQuery // 語義分析,生成邏輯計劃,作用是SqlNode–>RelNode
      *        |    
      *        |     
      *        +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未優化的RelNode   
      *        |      
      *        |     
      *    FlinkLogicalJoinConverter (RelOptRule) // Flink定制的優化rules      
      *    VolcanoRuleCall.onMatch // 基于Flink定制的一些優化rules去優化 Logical Plan 
      *        | 
      *        |   
      *        +-----> FlinkLogicalJoin (RelNode)  // Optimized Logical Plan,邏輯執行計劃
      *        |  
      *        |    
      *    StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin     
      *    VolcanoRuleCall.onMatch // 基于Flink rules將optimized LogicalPlan轉成Flink物理執行計劃
      *        |       
      *        |   
      *        +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理執行計劃
      *        |      
      *        |     
      *    StreamExecJoin.translateToPlanInternal  // 作用是生成 StreamOperator, 即Flink算子  
      *        |     
      *        |     
      *        +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask   
      *        |     
      *        |       
      *    StreamTwoInputProcessor.processRecord1// 在TwoInputStreamTask調用StreamingJoinOperator,真實的執行 
      *        |
      *        |  
      

      后續我們會以這個圖為脈絡進行講解

      0x01 Apache Calcite

      Flink Table API&SQL 為流式數據和靜態數據的關系查詢保留統一的接口,而且利用了Apache Calcite的查詢優化框架和SQL parser。

      為什么Flink要使用Table API呢?總結來說,關系型API的好處如下:

      • 關系型API是聲明式的
      • 查詢能夠被有效的優化
      • 查詢可以高效的執行
      • “Everybody” knows SQL

      Calcite是這里面的核心成員。Apache Calcite是面向Hadoop新的sql引擎,它提供了標準的SQL語言、多種查詢優化和連接各種數據源的能力。

      1. Calcite 概念

      下面是 Calcite 概念梳理:

      • 關系代數(Relational algebra):即關系表達式。它們通常以動詞命名,例如 Sort, Join, Project, Filter, Scan, Sample.
      • 表達式有各種特征(Trait):使用 Trait 的 satisfies() 方法來測試某個表達式是否符合某 Trait 或 Convention.
      • 規則(Rules):用于將一個表達式轉換(Transform)為另一個表達式。它有一個由 RelOptRuleOperand 組成的列表來決定是否可將規則應用于樹的某部分。
      • 規劃器(Planner) :即請求優化器,它可以根據一系列規則和成本模型(例如基于成本的優化模型 VolcanoPlanner、啟發式優化模型 HepPlanner)來將一個表達式轉為語義等價(但效率更優)的另一個表達式。
      • RelNode :代表了對數據的一個處理操作,常見的操作有 Sort、Join、Project、Filter、Scan 等。它蘊含的是對整個 Relation 的操作,而不是對具體數據的處理邏輯。RelNode 會標識其 input RelNode 信息,這樣就構成了一棵 RelNode 樹。
      • RexNode : 行表達式(標量表達式),蘊含的是對一行數據的處理邏輯。每個行表達式都有數據的類型。這是因為在 Valdiation 的過程中,編譯器會推導出表達式的結果類型。常見的行表達式包括字面量 RexLiteral, 變量 RexVariable,函數或操作符調用 RexCall 等。RexNode 通過 RexBuilder 進行構建。
      • RelTrait : 用來定義邏輯表的物理相關屬性(physical property),三種主要的 trait 類型是:Convention、RelCollation、RelDistribution;

      2. Calcite 處理流程

      Sql 的執行過程一般可以分為四個階段,Calcite 與這個很類似,但Calcite是分成五個階段 :

      1. SQL 解析階段,生成AST(抽象語法樹)(SQL–>SqlNode)

      2. SqlNode 驗證(SqlNode–>SqlNode)

      3. 語義分析,生成邏輯計劃(Logical Plan)(SqlNode–>RelNode/RexNode)

      4. 優化階段,按照相應的規則(Rule)進行優化(RelNode–>RelNode)

      5. 生成ExecutionPlan,生成物理執行計劃(DataStream Plan)

      1. Flink關系型API執行原理

      Flink承載了 Table API 和 SQL API 兩套表達方式。它以Apache Calcite這個SQL解析器做SQL語義解析,統一生成為 Calcite Logical Plan(SqlNode 樹);隨后驗證;再利用 Calcite的優化器優化轉換規則和logical plan,根據數據源的性質(流和批)使用不同的規則進行優化,優化為 RelNode 邏輯執行計劃樹;最終優化后的plan轉成常規的Flink DataSet 或 DataStream 程序。任何對于DataStream API和DataSet API的性能調優提升都能夠自動地提升Table API或者SQL查詢的效率。

      一條stream sql從提交到calcite解析、優化最后到Flink引擎執行,一般分為以下幾個階段:

      1. Sql Parser: 將sql語句通過java cc解析成AST(語法樹),在calcite中用SqlNode表示AST;
      2. Sql Validator: 結合數字字典(catalog)去驗證sql語法;
      3. 生成Logical Plan: 將sqlNode表示的AST轉換成LogicalPlan, 用relNode表示;
      4. 生成 optimized LogicalPlan: 先基于calcite rules 去優化logical Plan, 再基于Flink定制的一些優化rules去優化logical Plan;
      5. 生成Flink PhysicalPlan: 這里也是基于Flink里頭的rules,將optimized LogicalPlan轉成成Flink的物理執行計劃;
      6. 將物理執行計劃轉成Flink ExecutionPlan: 就是調用相應的tanslateToPlan方法轉換和利用CodeGen元編程成Flink的各種算子。

      而如果是通過table api來提交任務的話,也會經過calcite優化等階段,基本流程和直接運行sql類似:

      1. table api parser: Flink會把table api表達的計算邏輯也表示成一顆樹,用treeNode去表式;
        在這棵樹上的每個節點的計算邏輯用Expression來表示。
      2. Validate: 會結合數字字典(catalog)將樹的每個節點的Unresolved Expression進行綁定,生成Resolved Expression;
      3. 生成Logical Plan: 依次遍歷數的每個節點,調用construct方法將原先用treeNode表達的節點轉成成用calcite 內部的數據結構relNode 來表達。即生成了LogicalPlan, 用relNode表示;
      4. 生成 optimized LogicalPlan: 先基于calcite rules 去優化logical Plan,
        再基于Flink定制的一些優化rules去優化logical Plan;
      5. 生成Flink PhysicalPlan: 這里也是基于Flink里頭的rules,將optimized LogicalPlan轉成成Flink的物理執行計劃;
      6. 將物理執行計劃轉成Flink ExecutionPlan: 就是調用相應的tanslateToPlan方法轉換和利用CodeGen元編程成Flink的各種算子。

      可以看出來,Table API 與 SQL 在獲取 RelNode 之后是一樣的流程,只是獲取 RelNode 的方式有所區別:

      • Table API :通過使用 RelBuilder來拿到RelNode(LogicalNode與Expression分別轉換成RelNode與RexNode);
      • SQL :通過使用Planner。首先通過parse方法將用戶使用的SQL文本轉換成由SqlNode表示的parse tree。接著通過validate方法,使用元信息來resolve字段,確定類型,驗證有效性等等。最后通過rel方法將SqlNode轉換成RelNode;

      1. TableEnvironment對象

      TableEnvironment對象是Table API和SQL集成的一個核心,支持以下場景:

      • 注冊一個Table。
      • 將一個TableSource注冊給TableEnvironment,這里的TableSource指的是將數據存儲系統的作為Table,例如mysql,hbase,CSV,Kakfa,RabbitMQ等等。
      • 注冊一個外部的catalog,可以訪問外部系統的數據或文件。
      • 執行SQL查詢。
      • 注冊一個用戶自定義的function。
      • 將DataStream或DataSet轉成Table。

      一個查詢中只能綁定一個指定的TableEnvironment,TableEnvironment可以通過來配置TableConfig來配置,通過TableConfig可以自定義查詢優化以及translation的進程。

      TableEnvironment執行過程如下:

      • TableEnvironment.sql()為調用入口;

      • Flink實現了FlinkPlannerImpl,執行parse(sql),validate(sqlNode),rel(sqlNode)操作;

      • 生成Table;

      具體代碼摘要如下

      package org.apache.Flink.table.api.internal;
      
      @Internal
      public class TableEnvironmentImpl implements TableEnvironment {
      	private final CatalogManager catalogManager;
      	private final ModuleManager moduleManager;
      	private final OperationTreeBuilder operationTreeBuilder;
      	private final List<ModifyOperation> bufferedModifyOperations = new ArrayList<>();
      
      	protected final TableConfig tableConfig;
      	protected final Executor execEnv;
      	protected final FunctionCatalog functionCatalog;
      	protected final Planner planner;
      	protected final Parser parser;  
      }  
      
      // 在程序中打印類內容如下
      
      this = {StreamTableEnvironmentImpl@4701} 
       functionCatalog = {FunctionCatalog@4702} 
       scalaExecutionEnvironment = {StreamExecutionEnvironment@4703} 
       planner = {StreamPlanner@4704} 
        config = {TableConfig@4708} 
        executor = {StreamExecutor@4709} 
        PlannerBase.config = {TableConfig@4708} 
        functionCatalog = {FunctionCatalog@4702} 
        catalogManager = {CatalogManager@1250} 
        isStreamingMode = true
        plannerContext = {PlannerContext@4711} 
        parser = {ParserImpl@4696} 
       catalogManager = {CatalogManager@1250} 
       moduleManager = {ModuleManager@4705} 
       operationTreeBuilder = {OperationTreeBuilder@4706} 
       bufferedModifyOperations = {ArrayList@4707}  size = 0
       tableConfig = {TableConfig@4708} 
       execEnv = {StreamExecutor@4709} 
       TableEnvironmentImpl.functionCatalog = {FunctionCatalog@4702} 
       TableEnvironmentImpl.planner = {StreamPlanner@4704} 
       parser = {ParserImpl@4696} 
       registration = {TableEnvironmentImpl$1@4710} 
      

      2. Catalog

      Catalog – 定義元數據和命名空間,包含 Schema(庫),Table(表),RelDataType(類型信息)。

      所有對數據庫和表的元數據信息都存放在Flink CataLog內部目錄結構中,其存放了Flink內部所有與Table相關的元數據信息,包括表結構信息/數據源信息等。

      // TableEnvironment里面包含一個CatalogManager
      public final class CatalogManager {
      	// A map between names and catalogs.
      	private Map<String, Catalog> catalogs;  
      } 
      
      // Catalog接口
      public interface Catalog {
        ......
        	default Optional<TableFactory> getTableFactory() {
      		return Optional.empty();
      	}
        ......
      }   
      
      // 當數據來源是在程序里面自定義的時候,對應是GenericInMemoryCatalog
      public class GenericInMemoryCatalog extends AbstractCatalog {
      	public static final String DEFAULT_DB = "default";
      	private final Map<String, CatalogDatabase> databases;
      	private final Map<ObjectPath, CatalogBaseTable> tables;
      	private final Map<ObjectPath, CatalogFunction> functions;
      	private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogPartition>> partitions;
      
      	private final Map<ObjectPath, CatalogTableStatistics> tableStats;
      	private final Map<ObjectPath, CatalogColumnStatistics> tableColumnStats;
      	private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogTableStatistics>> partitionStats;
      	private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogColumnStatistics>> partitionColumnStats;
      }  
      
      // 程序中調試的內容  
      
      catalogManager = {CatalogManager@4646} 
       catalogs = {LinkedHashMap@4652}  size = 1
        "default_catalog" -> {GenericInMemoryCatalog@4659} 
         key = "default_catalog"
          value = {char[15]@4668} 
          hash = 552406043
         value = {GenericInMemoryCatalog@4659} 
          databases = {LinkedHashMap@4660}  size = 1
          tables = {LinkedHashMap@4661}  size = 0
          functions = {LinkedHashMap@4662}  size = 0
          partitions = {LinkedHashMap@4663}  size = 0
          tableStats = {LinkedHashMap@4664}  size = 0
          tableColumnStats = {LinkedHashMap@4665}  size = 0
          partitionStats = {LinkedHashMap@4666}  size = 0
          partitionColumnStats = {LinkedHashMap@4667}  size = 0
          catalogName = "default_catalog"
          defaultDatabase = "default_database"
       temporaryTables = {HashMap@4653}  size = 2
       currentCatalogName = "default_catalog"
       currentDatabaseName = "default_database"
       builtInCatalogName = "default_catalog"
      

      3. StreamPlanner

      StreamPlanner是新的Blink Planner一種。

      Flink Table 的新架構實現了查詢處理器的插件化,社區完整保留原有 Flink Planner (Old Planner),同時又引入了新的 Blink Planner,用戶可以自行選擇使用 Old Planner 還是 Blink Planner。

      在模型上,Old Planner 沒有考慮流計算作業和批處理作業的統一,針對流計算作業和批處理作業的實現不盡相同,在底層會分別翻譯到 DataStream API 和 DataSet API 上。而 Blink Planner 將批數據集看作 bounded DataStream (有界流式數據) ,流計算作業和批處理作業最終都會翻譯到 Transformation API 上。 在架構上,Blink Planner 針對批處理和流計算,分別實現了BatchPlanner 和 StreamPlanner ,兩者共用了大部分代碼,共享了很多優化邏輯。 Old Planner 針對批處理和流計算的代碼實現的是完全獨立的兩套體系,基本沒有實現代碼和優化邏輯復用。

      除了模型和架構上的優點外,Blink Planner 沉淀了許多實用功能,集中在三個方面:

      • Blink Planner 對代碼生成機制做了改進、對部分算子進行了優化,提供了豐富實用的新功能,如維表 join、Top N、MiniBatch、流式去重、聚合場景的數據傾斜優化等新功能。
      • Blink Planner 的優化策略是基于公共子圖的優化算法,包含了基于成本的優化(CBO)和基于規則的優化(CRO)兩種策略,優化更為全面。同時,Blink Planner 支持從 catalog 中獲取數據源的統計信息,這對CBO優化非常重要。
      • Blink Planner 提供了更多的內置函數,更標準的 SQL 支持,在 Flink 1.9 版本中已經完整支持 TPC-H ,對高階的 TPC-DS 支持也計劃在下一個版本實現。

      具體對應代碼來看,StreamPlanner體現在translateToPlan會調用到不同的 StreamOperator 生成系統上。

      class StreamPlanner(
          executor: Executor,
          config: TableConfig,
          functionCatalog: FunctionCatalog,
          catalogManager: CatalogManager)
        extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = true) {
      
        override protected def translateToPlan(
            execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = {
          execNodes.map {
            case node: StreamExecNode[_] => node.translateToPlan(this)
            case _ =>
              throw new TableException("Cannot generate DataStream due to an invalid logical plan. " +
                "This is a bug and should not happen. Please file an issue.")
          }
        }
      }
      
      @Internal
      public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl implements StreamTableEnvironment {
      
      	private <T> DataStream<T> toDataStream(Table table, OutputConversionModifyOperation modifyOperation) {
          
          // 在轉換回DataStream時候進行調用 planner 生成plan的操作。
          
      		List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation));
      
      		Transformation<T> transformation = getTransformation(table, transformations);
      
      		executionEnvironment.addOperator(transformation);
      		return new DataStream<>(executionEnvironment, transformation);
      	}
      }
      
      // 程序中調試打印的運行棧 
      
      translateToPlanInternal:85, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
      translateToPlanInternal:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
      translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec)
      translateToPlan:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
      translateToTransformation:184, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
      translateToPlanInternal:153, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
      translateToPlanInternal:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
      translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec)
      translateToPlan:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
      apply:60, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation)
      apply:59, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation)
      apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
      apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
      foreach:891, Iterator$class (scala.collection)
      foreach:1334, AbstractIterator (scala.collection)
      foreach:72, IterableLike$class (scala.collection)
      foreach:54, AbstractIterable (scala.collection)
      map:234, TraversableLike$class (scala.collection)
      map:104, AbstractTraversable (scala.collection)
      translateToPlan:59, StreamPlanner (org.apache.Flink.table.planner.delegation)
      translate:153, PlannerBase (org.apache.Flink.table.planner.delegation)
      toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
      main:89, StreamSQLExample$ (spendreport)
      main:-1, StreamSQLExample (spendreport)
      

      4. FlinkPlannerImpl

      Flink實現了FlinkPlannerImpl,做為和Calcite 聯系的橋梁,執行parse(sql),validate(sqlNode),rel(sqlNode)操作。

      class FlinkPlannerImpl(
          config: FrameworkConfig,
          catalogReaderSupplier: JFunction[JBoolean, CalciteCatalogReader],
          typeFactory: FlinkTypeFactory,
          cluster: RelOptCluster) {
      
        val operatorTable: SqlOperatorTable = config.getOperatorTable
        val parser: CalciteParser = new CalciteParser(config.getParserConfig)
        val convertletTable: SqlRexConvertletTable = config.getConvertletTable
        val sqlToRelConverterConfig: SqlToRelConverter.Config = config.getSqlToRelConverterConfig
      }
      
      // 這里會有使用 FlinkPlannerImpl
      public class ParserImpl implements Parser {
      	private final CatalogManager catalogManager;
      	private final Supplier<FlinkPlannerImpl> validatorSupplier;
      	private final Supplier<CalciteParser> calciteParserSupplier;
        
      	@Override
      	public List<Operation> parse(String statement) {
      		CalciteParser parser = calciteParserSupplier.get();
          // 這里會有使用 FlinkPlannerImpl
      		FlinkPlannerImpl planner = validatorSupplier.get();
      		// parse the sql query
      		SqlNode parsed = parser.parse(statement);
      		Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
      			.orElseThrow(() -> new TableException("Unsupported query: " + statement));
      		return Collections.singletonList(operation);
      	}  
      }
      
      // 程序中調試的內容  
      
      planner = {FlinkPlannerImpl@4659} 
       config = {Frameworks$StdFrameworkConfig@4685} 
       catalogReaderSupplier = {PlannerContext$lambda@4686} 
       typeFactory = {FlinkTypeFactory@4687} 
       cluster = {FlinkRelOptCluster@4688} 
       operatorTable = {ChainedSqlOperatorTable@4689} 
       parser = {CalciteParser@4690} 
       convertletTable = {StandardConvertletTable@4691} 
       sqlToRelConverterConfig = {SqlToRelConverter$ConfigImpl@4692} 
       validator = null
         
      // 程序調用棧之一   
         
      validate:104, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
      convert:127, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
      parse:66, ParserImpl (org.apache.Flink.table.planner.delegation)
      sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
      main:82, StreamSQLExample$ (spendreport)
      main:-1, StreamSQLExample (spendreport)
        
      // 程序調用棧之二       
        
      rel:135, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
      toQueryOperation:522, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
      convertSqlQuery:436, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
      convert:154, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
      parse:66, ParserImpl (org.apache.Flink.table.planner.delegation)
      sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
      main:82, StreamSQLExample$ (spendreport)
      main:-1, StreamSQLExample (spendreport)  
      

      5. Table 和 TableImpl

      從代碼中能看出,這就是個把各種相關操作和信息封裝起來類而已,并不涉及太多實際邏輯。

      @Internal
      public class TableImpl implements Table {
      
      	private static final AtomicInteger uniqueId = new AtomicInteger(0);
      
      	private final TableEnvironment tableEnvironment;
      	private final QueryOperation operationTree;
      	private final OperationTreeBuilder operationTreeBuilder;
      	private final LookupCallResolver lookupResolver;
        
      	private TableImpl joinInternal(
      			Table right,
      			Optional<Expression> joinPredicate,
      			JoinType joinType) {
      		verifyTableCompatible(right);
      
      		return createTable(operationTreeBuilder.join(
      			this.operationTree,
      			right.getQueryOperation(),
      			joinType,
      			joinPredicate,
      			false));
      	}
      }
      
      // 程序中調試的內容 
      
      view = {TableImpl@4583} "UnnamedTable$0"
       tableEnvironment = {StreamTableEnvironmentImpl@4580} 
        functionCatalog = {FunctionCatalog@4646} 
        scalaExecutionEnvironment = {StreamExecutionEnvironment@4579} 
        planner = {StreamPlanner@4647} 
        catalogManager = {CatalogManager@4644} 
        moduleManager = {ModuleManager@4648} 
        operationTreeBuilder = {OperationTreeBuilder@4649} 
        bufferedModifyOperations = {ArrayList@4650}  size = 0
        tableConfig = {TableConfig@4651} 
        execEnv = {StreamExecutor@4652} 
        TableEnvironmentImpl.functionCatalog = {FunctionCatalog@4646} 
        TableEnvironmentImpl.planner = {StreamPlanner@4647} 
        parser = {ParserImpl@4653} 
        registration = {TableEnvironmentImpl$1@4654} 
       operationTree = {ScalaDataStreamQueryOperation@4665} 
        identifier = null
        dataStream = {DataStreamSource@4676} 
        fieldIndices = {int[2]@4677} 
        tableSchema = {TableSchema@4678} "root\n |-- orderId: STRING\n |-- productName: STRING\n"
       operationTreeBuilder = {OperationTreeBuilder@4649} 
        config = {TableConfig@4651} 
        functionCatalog = {FunctionCatalog@4646} 
        tableReferenceLookup = {TableEnvironmentImpl$lambda@4668} 
        lookupResolver = {LookupCallResolver@4669} 
        projectionOperationFactory = {ProjectionOperationFactory@4670} 
        sortOperationFactory = {SortOperationFactory@4671} 
        calculatedTableFactory = {CalculatedTableFactory@4672} 
        setOperationFactory = {SetOperationFactory@4673} 
        aggregateOperationFactory = {AggregateOperationFactory@4674} 
        joinOperationFactory = {JoinOperationFactory@4675} 
       lookupResolver = {LookupCallResolver@4666} 
        functionLookup = {FunctionCatalog@4646} 
       tableName = "UnnamedTable$0"
        value = {char[14]@4667} 
        hash = 1355882650
      

      1. SQL 解析階段(SQL–>SqlNode)

      這里對應前面脈絡圖,作用是生成了 SqlJoin 這樣的 SqlNode

      // NOTE : 執行順序是從上至下," -----> " 表示生成的實例類型
      * 
      *        +-----> "left outer JOIN" (SQL statement)
      *        |   
      *        |     
      *     SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),作用是SQL–>SqlNode      
      *        |   
      *        |  
      *        +-----> SqlJoin (SqlNode)
      *        |   
      *        |   
      

      Calcite 使用 JavaCC 做 SQL 解析,JavaCC 根據 Calcite 中定義的 Parser.jj 文件,生成一系列的 java 代碼,生成的 Java 代碼會把 SQL 轉換成 AST 的數據結構(這里是 SqlNode 類型)。

      即:把 SQL 轉換成為 AST (抽象語法樹),在 Calcite 中用 SqlNode 來表示;

      package org.apache.Flink.table.planner.delegation;
      
      public class ParserImpl implements Parser {
      	@Override
      	public List<Operation> parse(String statement) {
      		CalciteParser parser = calciteParserSupplier.get();
      		FlinkPlannerImpl planner = validatorSupplier.get();
          
      		// parse the sql query
      		SqlNode parsed = parser.parse(statement);
      
      		Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
      			.orElseThrow(() -> new TableException("Unsupported query: " + statement));
      		return Collections.singletonList(operation);
      	}  
      }  
      
      // 打印出來解析之后 parsed 的內容,我們能看到 SqlNode 的基本格式。
        
      parsed = {SqlBasicCall@4690} "SELECT *\nFROM `UnnamedTable$0`\nWHERE `amount` > 2\nUNION ALL\nSELECT *\nFROM `OrderB`\nWHERE `amount` < 2"
       operator = {SqlSetOperator@4716} "UNION ALL"
        all = true
        name = "UNION ALL"
        kind = {SqlKind@4742} "UNION"
        leftPrec = 14
        rightPrec = 15
        returnTypeInference = {ReturnTypes$lambda@4743} 
        operandTypeInference = null
        operandTypeChecker = {SetopOperandTypeChecker@4744} 
       operands = {SqlNode[2]@4717} 
        0 = {SqlSelect@4746} "SELECT *\nFROM `UnnamedTable$0`\nWHERE `amount` > 2"
        1 = {SqlSelect@4747} "SELECT *\nFROM `OrderB`\nWHERE `amount` < 2"
       functionQuantifier = null
       expanded = false
       pos = {SqlParserPos@4719} "line 2, column 1"  
          
      // 下面是調試相關Stack,可以幫助大家深入理解
          
      SqlStmt:3208, FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl)
      SqlStmtEof:3732, FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl)
      parseSqlStmtEof:234, FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl)
      parseQuery:160, SqlParser (org.apache.calcite.sql.parser)
      parseStmt:187, SqlParser (org.apache.calcite.sql.parser)
      parse:48, CalciteParser (org.apache.Flink.table.planner.calcite)
      parse:64, ParserImpl (org.apache.Flink.table.planner.delegation)
      sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
      main:82, StreamSQLExample$ (spendreport)
      main:-1, StreamSQLExample (spendreport)
        
      // 另一個參考 in FlinkSqlParserImpl.FromClause     
          
      e = {SqlJoin@4709} "`Orders` AS `o`\nLEFT JOIN `Payment` AS `p` ON `o`.`orderId` = `p`.`orderId`"
       left = {SqlBasicCall@4676} "`Orders` AS `o`"
        operator = {SqlAsOperator@4752} "AS"
        operands = {SqlNode[2]@4753} 
        functionQuantifier = null
        expanded = false
        pos = {SqlParserPos@4755} "line 7, column 3"
       natural = {SqlLiteral@4677} "FALSE"
        typeName = {SqlTypeName@4775} "BOOLEAN"
        value = {Boolean@4776} false
        pos = {SqlParserPos@4777} "line 7, column 13"
       joinType = {SqlLiteral@4678} "LEFT"
        typeName = {SqlTypeName@4758} "SYMBOL"
        value = {JoinType@4759} "LEFT"
        pos = {SqlParserPos@4724} "line 7, column 26"
       right = {SqlBasicCall@4679} "`Payment` AS `p`"
        operator = {SqlAsOperator@4752} "AS"
        operands = {SqlNode[2]@4763} 
        functionQuantifier = null
        expanded = false
        pos = {SqlParserPos@4764} "line 7, column 31"
       conditionType = {SqlLiteral@4680} "ON"
        typeName = {SqlTypeName@4758} "SYMBOL"
        value = {JoinConditionType@4771} "ON"
        pos = {SqlParserPos@4772} "line 7, column 44"
       condition = {SqlBasicCall@4681} "`o`.`orderId` = `p`.`orderId`"
        operator = {SqlBinaryOperator@4766} "="
        operands = {SqlNode[2]@4767} 
        functionQuantifier = null
        expanded = false
        pos = {SqlParserPos@4768} "line 7, column 47"
       pos = {SqlParserPos@4724} "line 7, column 26" 
              
      // 下面是調試相關Stack,可以幫助大家深入理解
          
      FromClause:10192, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
      SqlSelect:5918, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
      LeafQuery:630, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
      LeafQueryOrExpr:15651, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
      QueryOrExpr:15118, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
      OrderedQueryOrExpr:504, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
      SqlStmt:3693, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
      SqlStmtEof:3732, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
      parseSqlStmtEof:234, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
      parseQuery:160, SqlParser (org.apache.calcite.sql.parser)
      parseStmt:187, SqlParser (org.apache.calcite.sql.parser)
      parse:48, CalciteParser (org.apache.flink.table.planner.calcite)
      parse:64, ParserImpl (org.apache.flink.table.planner.delegation)
      sqlQuery:464, TableEnvironmentImpl (org.apache.flink.table.api.internal)
      main:73, SimpleOuterJoin$ (spendreport)
      main:-1, SimpleOuterJoin (spendreport)  
      

      2. SqlNode 驗證(SqlNode–>SqlNode)

      經過上面的第一步,會生成一個 SqlNode 對象,它是一個未經驗證的抽象語法樹,下面就進入了一個語法檢查階段,語法檢查前需要知道元數據信息,這個檢查會包括表名、字段名、函數名、數據類型的檢查。

      即:語法檢查,根據元數據信息進行語法驗證,驗證之后還是用 SqlNode 表示 AST 語法樹;

      package org.apache.Flink.table.planner.operations;
      
      public class SqlToOperationConverter {
      	public static Optional<Operation> convert(
          // 這里進行validate的調用
      		final SqlNode validated = FlinkPlanner.validate(sqlNode);
      		SqlToOperationConverter converter = new SqlToOperationConverter(FlinkPlanner, catalogManager);
      	}    
      }
          
      // 打印出來解析之后 validated 的內容。    
          
      validated = {SqlBasicCall@4675} "SELECT `UnnamedTable$0`.`user`, `UnnamedTable$0`.`product`, `UnnamedTable$0`.`amount`\nFROM `default_catalog`.`default_database`.`UnnamedTable$0` AS `UnnamedTable$0`\nWHERE `UnnamedTable$0`.`amount` > 2\nUNION ALL\nSELECT `OrderB`.`user`, `OrderB`.`product`, `OrderB`.`amount`\nFROM `default_catalog`.`default_database`.`OrderB` AS `OrderB`\nWHERE `OrderB`.`amount` < 2"
       operator = {SqlSetOperator@5000} "UNION ALL"
        all = true
        name = "UNION ALL"
        kind = {SqlKind@5029} "UNION"
        leftPrec = 14
        rightPrec = 15
        returnTypeInference = {ReturnTypes$lambda@5030} 
        operandTypeInference = null
        operandTypeChecker = {SetopOperandTypeChecker@5031} 
       operands = {SqlNode[2]@5001} 
        0 = {SqlSelect@4840} "SELECT `UnnamedTable$0`.`user`, `UnnamedTable$0`.`product`, `UnnamedTable$0`.`amount`\nFROM `default_catalog`.`default_database`.`UnnamedTable$0` AS `UnnamedTable$0`\nWHERE `UnnamedTable$0`.`amount` > 2"
        1 = {SqlSelect@5026} "SELECT `OrderB`.`user`, `OrderB`.`product`, `OrderB`.`amount`\nFROM `default_catalog`.`default_database`.`OrderB` AS `OrderB`\nWHERE `OrderB`.`amount` < 2"
       functionQuantifier = null
       expanded = false
       pos = {SqlParserPos@5003} "line 2, column 1"    
          
      // 下面是調試相關Stack,可以幫助大家深入理解    
          
      validate:81, AbstractNamespace (org.apache.calcite.sql.validate)
      validateNamespace:1008, SqlValidatorImpl (org.apache.calcite.sql.validate)
      validateQuery:968, SqlValidatorImpl (org.apache.calcite.sql.validate)
      validateCall:90, SqlSetOperator (org.apache.calcite.sql)
      validateCall:5304, SqlValidatorImpl (org.apache.calcite.sql.validate)
      validate:116, SqlCall (org.apache.calcite.sql)
      validateScopedExpression:943, SqlValidatorImpl (org.apache.calcite.sql.validate)
      validate:650, SqlValidatorImpl (org.apache.calcite.sql.validate)
      org$apache$Flink$table$planner$calcite$FlinkPlannerImpl$$validate:126, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
      validate:105, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
      convert:127, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
      parse:66, ParserImpl (org.apache.Flink.table.planner.delegation)
      sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
      main:82, StreamSQLExample$ (spendreport)
      main:-1, StreamSQLExample (spendreport)    
      

      3. 語義分析(SqlNode–>RelNode/RexNode)

      脈絡圖中,這時候來到了

      // NOTE : 執行順序是從上至下," -----> " 表示生成的實例類型
      * 
      *        +-----> "left outer JOIN" (SQL statement)
      *        |   
      *        |     
      *     SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),作用是SQL–>SqlNode      
      *        |   
      *        |      
      *        +-----> SqlJoin (SqlNode)
      *        |   
      *        |     
      *     SqlToRelConverter.convertQuery // 語義分析,生成邏輯計劃,作用是SqlNode–>RelNode
      *        |    
      *        |     
      *        +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未優化的RelNode   
      *        |      
      *        |     
      

      經過第二步之后,這里的 SqlNode 就是經過語法校驗的 SqlNode 樹,接下來這一步就是將 SqlNode 轉換成 RelNode/RexNode,也就是生成相應的邏輯計劃(Logical Plan)

      即:語義分析,根據 SqlNode及元信息構建 RelNode 樹,也就是最初版本的邏輯計劃(Logical Plan);

      根據這個已經生成的Flink的logical Plan,將它轉換成calcite的logicalPlan,這樣我們才能用到calcite強大的優化規則

      Flink由上往下依次調用各個節點的construct方法,將Flink節點轉換成calcite的RelNode節點。真正的實現是在 convertQueryRecursive() 方法中完成的。

      比如生成 LogicalProject 調用關系大概如下:

      createJoin:378, RelFactories$JoinFactoryImpl (org.apache.calcite.rel.core)
      createJoin:2520, SqlToRelConverter (org.apache.calcite.sql2rel)
      convertFrom:2111, SqlToRelConverter (org.apache.calcite.sql2rel)
      convertSelectImpl:646, SqlToRelConverter (org.apache.calcite.sql2rel)
      convertSelect:627, SqlToRelConverter (org.apache.calcite.sql2rel)
      convertQueryRecursive:3181, SqlToRelConverter (org.apache.calcite.sql2rel)
      convertQuery:563, SqlToRelConverter (org.apache.calcite.sql2rel)
      org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:148, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
      rel:135, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
      toQueryOperation:522, SqlToOperationConverter (org.apache.flink.table.planner.operations)
      convertSqlQuery:436, SqlToOperationConverter (org.apache.flink.table.planner.operations)
      convert:154, SqlToOperationConverter (org.apache.flink.table.planner.operations)
      parse:66, ParserImpl (org.apache.flink.table.planner.delegation)
      sqlQuery:464, TableEnvironmentImpl (org.apache.flink.table.api.internal)
      main:73, SimpleOuterJoin$ (spendreport)
      main:-1, SimpleOuterJoin (spendreport)
      

      具體詳細源碼如下:

      SqlToRelConverter 中的 convertQuery() 將 SqlNode 轉換為 RelRoot
        
      public class SqlToRelConverter {  
          public RelRoot convertQuery(SqlNode query, boolean needsValidation, boolean top) {
              if (needsValidation) {
                  query = this.validator.validate(query);
              }
                    	             RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(this.cluster.getMetadataProvider()));
              RelNode result = this.convertQueryRecursive(query, top, (RelDataType)null).rel;
              if (top && isStream(query)) {
                  result = new LogicalDelta(this.cluster, ((RelNode)result).getTraitSet(), (RelNode)result);
              }
      
              RelCollation collation = RelCollations.EMPTY;
              if (!query.isA(SqlKind.DML) && isOrdered(query)) {
                  collation = this.requiredCollation((RelNode)result);
              }
      
              this.checkConvertedType(query, (RelNode)result);
      
              RelDataType validatedRowType = this.validator.getValidatedNodeType(query);
            
              // 這里設定了Root
              return RelRoot.of((RelNode)result, validatedRowType, query.getKind()).withCollation(collation);
          }
      }
      
      // 在這里打印
      toQueryOperation:523, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
      
      // 得到如下內容,可以看到一個RelRoot的真實結構
        
      relational = {RelRoot@5248} "Root {kind: UNION, rel: LogicalUnion#6, rowType: RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount), fields: [<0, user>, <1, product>, <2, amount>], collation: []}"
       rel = {LogicalUnion@5227} "LogicalUnion#6"
        inputs = {RegularImmutableList@5272}  size = 2
        kind = {SqlKind@5029} "UNION"
        all = true
        desc = "LogicalUnion#6"
        rowType = {RelRecordType@5238} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)"
        digest = "LogicalUnion#6"
        cluster = {FlinkRelOptCluster@4800} 
        id = 6
        traitSet = {RelTraitSet@5273}  size = 5
       validatedRowType = {RelRecordType@5238} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)"
        kind = {StructKind@5268} "FULLY_QUALIFIED"
        nullable = false
        fieldList = {RegularImmutableList@5269}  size = 3
        digest = "RecordType(BIGINT user, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" product, INTEGER amount) NOT NULL"
       kind = {SqlKind@5029} "UNION"
        lowerName = "union"
        sql = "UNION"
        name = "UNION"
        ordinal = 18
       fields = {RegularImmutableList@5254}  size = 3
        {Integer@5261} 0 -> "user"
        {Integer@5263} 1 -> "product"
        {Integer@5265} 2 -> "amount"
       collation = {RelCollationImpl@5237} "[]"
        fieldCollations = {RegularImmutableList@5256}  size = 0
      
      // 調用棧內容
          
      convertQuery:561, SqlToRelConverter (org.apache.calcite.sql2rel)
      org$apache$Flink$table$planner$calcite$FlinkPlannerImpl$$rel:148, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
      rel:135, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
      toQueryOperation:522, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
      convertSqlQuery:436, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
      convert:154, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
      parse:66, ParserImpl (org.apache.Flink.table.planner.delegation)
      sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
      main:82, StreamSQLExample$ (spendreport)
      main:-1, StreamSQLExample (spendreport)
        
      // 再次舉例,生成了LogicalProject 
        
      bb = {SqlToRelConverter$Blackboard@4978} 
       scope = {SelectScope@4977} 
       nameToNodeMap = null
       root = {LogicalProject@5100} "LogicalProject#4"
        exps = {RegularImmutableList@5105}  size = 3
        input = {LogicalJoin@5106} "LogicalJoin#3"
        desc = "LogicalProject#4"
        rowType = {RelRecordType@5107} "RecordType(VARCHAR(2147483647) orderId, VARCHAR(2147483647) productName, VARCHAR(2147483647) payType)"
        digest = "LogicalProject#4"
        cluster = {FlinkRelOptCluster@4949} 
        id = 4
        traitSet = {RelTraitSet@5108}  size = 5
       inputs = {Collections$SingletonList@5111}  size = 1
       mapCorrelateToRex = {HashMap@5112}  size = 0
       isPatternVarRef = false
       cursors = {ArrayList@5113}  size = 0
       subQueryList = {LinkedHashSet@5114}  size = 0
       agg = null
       window = null
       mapRootRelToFieldProjection = {HashMap@5115}  size = 0
       columnMonotonicities = {ArrayList@5116}  size = 3
       systemFieldList = {ArrayList@5117}  size = 0
       top = true
       initializerExpressionFactory = {NullInitializerExpressionFactory@5118} 
       this$0 = {SqlToRelConverter@4926} 
      
      // 舉例,LogicalProject是在這里生成的。
      
         protected void convertFrom(SqlToRelConverter.Blackboard bb, SqlNode from) {
                  case JOIN:
                      RelNode joinRel = this.createJoin(fromBlackboard, leftRel, rightRel, conditionExp, convertedJoinType);
                      bb.setRoot(joinRel, false);  
         }
        
      // 相關調用棧
      
      createJoin:378, RelFactories$JoinFactoryImpl (org.apache.calcite.rel.core)
      createJoin:2520, SqlToRelConverter (org.apache.calcite.sql2rel)
      convertFrom:2111, SqlToRelConverter (org.apache.calcite.sql2rel)
      convertSelectImpl:646, SqlToRelConverter (org.apache.calcite.sql2rel)
      convertSelect:627, SqlToRelConverter (org.apache.calcite.sql2rel)
      convertQueryRecursive:3181, SqlToRelConverter (org.apache.calcite.sql2rel)
      convertQuery:563, SqlToRelConverter (org.apache.calcite.sql2rel)
      org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:148, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
      rel:135, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
      toQueryOperation:522, SqlToOperationConverter (org.apache.flink.table.planner.operations)
      convertSqlQuery:436, SqlToOperationConverter (org.apache.flink.table.planner.operations)
      convert:154, SqlToOperationConverter (org.apache.flink.table.planner.operations)
      parse:66, ParserImpl (org.apache.flink.table.planner.delegation)
      sqlQuery:464, TableEnvironmentImpl (org.apache.flink.table.api.internal)
      main:73, SimpleOuterJoin$ (spendreport)
      main:-1, SimpleOuterJoin (spendreport)
      

      4. 優化階段(RelNode–>RelNode)

      這時候,脈絡圖到了這里

      // NOTE : 執行順序是從上至下," -----> " 表示生成的實例類型
      * 
      *        +-----> "left outer JOIN" (SQL statement)
      *        |   
      *        |     
      *     SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),作用是SQL–>SqlNode      
      *        |   
      *        |      
      *        +-----> SqlJoin (SqlNode)
      *        |   
      *        |     
      *     SqlToRelConverter.convertQuery // 語義分析,生成邏輯計劃,作用是SqlNode–>RelNode
      *        |    
      *        |     
      *        +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未優化的RelNode   
      *        |      
      *        |     
      *    FlinkLogicalJoinConverter (RelOptRule) // Flink定制的優化rules      
      *    VolcanoRuleCall.onMatch // 基于Flink定制的一些優化rules去優化 Logical Plan 
      *        | 
      *        |   
      *        +-----> FlinkLogicalJoin (RelNode)  // Optimized Logical Plan,邏輯執行計劃
      *        |  
      *        |    
      *    StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin     
      *    VolcanoRuleCall.onMatch // 基于Flink rules將optimized LogicalPlan轉成Flink物理執行計劃
      *        |       
      *        |   
      *        +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理執行計劃
      *        |      
      *        |     
      

      第四階段,也就是 Calcite 的核心所在。

      即:邏輯計劃優化,優化器的核心,根據前面生成的邏輯計劃按照相應的規則(Rule)進行優化;

      Flink的這部分實現統一封裝在optimize方法里頭。這部分涉及到多個階段,每個階段都是用Rule對邏輯計劃進行優化和改進。

      優化器的作用

      在 Calcite 架構中,最核心地方就是 Optimizer,也就是優化器,一個 Optimization Engine 包含三個組成部分:

      • rules:也就是匹配規則,Calcite 內置上百種 Rules 來優化 relational expression,當然也支持自定義 rules;
      • metadata providers:主要是向優化器提供信息,這些信息會有助于指導優化器向著目標(減少整體 cost)進行優化,信息可以包括行數、table 哪一列是唯一列等,也包括計算 RelNode 樹中執行 subexpression cost 的函數;
      • planner engines:它的主要目標是進行觸發 rules 來達到指定目標,比如像 cost-based optimizer(CBO)的目標是減少cost(Cost 包括處理的數據行數、CPU cost、IO cost 等)。

      優化器的作用是將解析器生成的關系代數表達式轉換成執行計劃,供執行引擎執行,在這個過程中,會應用一些規則優化,以幫助生成更高效的執行計劃。優化器進行優化的地方如過濾條件的下壓(push down),在進行 join 操作前,先進行 filter 操作,這樣的話就不需要在 join 時進行全量 join,減少參與 join 的數據量等。

      Calcite 中 RelOptPlanner 是 Calcite 中優化器的基類。Calcite 中關于優化器提供了兩種實現:

      • HepPlanner:就是基于規則優化RBO 的實現,它是一個啟發式的優化器,按照規則進行匹配,直到達到次數限制(match 次數限制)或者遍歷一遍后不再出現 rule match 的情況才算完成;
      • VolcanoPlanner:就是基于成本優化CBO 的實現,它會一直迭代 rules,直到找到 cost 最小的 paln。

      基于成本優化(CBO)

      基于代價的優化器(Cost-Based Optimizer,CBO) 是根據優化規則對關系表達式進行轉換。這里的轉換是說一個關系表達式經過優化規則后會生成另外一個關系表達式,同時原有表達式也會保留,經過一系列轉換后會生成多個執行計劃,然后 CBO 會根據統計信息和代價模型 (Cost Model) 計算每個執行計劃的 Cost,從中挑選 Cost 最小的執行計劃。

      由上可知,CBO 中有兩個依賴:統計信息和代價模型。統計信息的準確與否、代價模型的合理與否都會影響 CBO 選擇最優計劃。 從上述描述可知,CBO 是優于 RBO 的,原因是 RBO 是一種只認規則,對數據不敏感的呆板的優化器,而在實際過程中,數據往往是有變化的,通過 RBO 生成的執行計劃很有可能不是最優的。事實上目前各大數據庫和大數據計算引擎都傾向于使用 CBO,但是對于流式計算引擎來說,使用 CBO 還是有很大難度的,因為并不能提前預知數據量等信息,這會極大地影響優化效果,CBO 主要還是應用在離線的場景。

      VolcanoPlanner相關概念

      VolcanoPlanner就是 CBO 的實現,它會一直迭代 rules,直到找到 cost 最小的 paln。其部分相關概念如下:

      • RelSet 描述一組等價 Relation Expression,所有的 RelNode 會記錄在 rels 中;
      • RelSubset 描述一組物理屬性相同的等價 Relation Expression,即它們具有相同的 Physical Properties;每個 RelSubset 都會記錄其所屬的 RelSet;RelSubset 繼承自 AbstractRelNode,它也是一種 RelNode,物理屬性記錄在其成員變量 traitSet 中。每個 RelSubset 都將會記錄其最佳 plan(best)和最佳 plan 的 cost(bestCost)信息。
      • RuleMatch 是對 Rule 和 RelSubset 關系的一個抽象,它會記錄這兩者的信息。
      • importance 決定了在進行 Rule 優化時 Rule 應用的順序,它是一個相對概念,在 VolcanoPlanner 中有兩個 importance,分別是 RelSubset 和 RuleMatch 的 importance

      VolcanoPlanner執行步驟

      在應用 VolcanoPlanner 時,整體分為以下四步:

      1. 初始化 VolcanoPlanner,并向 Rule Match Queue 中添加相應的 Rule Match(包括 ConverterRule);
      2. 對 RelNode 做等價轉換:應用 Rule Match 對 plan graph 做 transformation 優化(Rule specifies an Operator sub-graph to match and logic to generate equivalent better sub-graph);這里只是改變其物理屬性(Convention);
      3. 通過 VolcanoPlanner 的 setRoot() 方法注冊相應的 RelNode,并進行相應的初始化操作;
      4. 通過動態規劃算法進行相應的迭代,直到 cost 不再變化或者 Rule Match Queue 中 rule match 已經全部應用完成;這樣找到 cost 最小的 plan;Rule Match 的 importance 依賴于 RelNode 的 cost 和深度。

      下面通過這個 示例 來詳細看下 VolcanoPlanner 內部的實現邏輯。

      //1. 初始化 VolcanoPlanner 對象,并添加相應的 Rule
      VolcanoPlanner planner = new VolcanoPlanner();
      planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
      planner.addRelTraitDef(RelDistributionTraitDef.INSTANCE);
      // 添加相應的 rule
        planner.addRule(FilterJoinRule.FilterIntoJoinRule.FILTER_ON_JOIN);
      planner.addRule(ReduceExpressionsRule.PROJECT_INSTANCE);
      planner.addRule(PruneEmptyRules.PROJECT_INSTANCE);
      // 添加相應的 ConverterRule
      planner.addRule(EnumerableRules.ENUMERABLE_MERGE_JOIN_RULE);
      planner.addRule(EnumerableRules.ENUMERABLE_SORT_RULE);
      planner.addRule(EnumerableRules.ENUMERABLE_VALUES_RULE);
      planner.addRule(EnumerableRules.ENUMERABLE_PROJECT_RULE);
      planner.addRule(EnumerableRules.ENUMERABLE_FILTER_RULE);
      //2. Changes a relational expression to an equivalent one with a different set of traits.
      RelTraitSet desiredTraits =
          relNode.getCluster().traitSet().replace(EnumerableConvention.INSTANCE);
      relNode = planner.changeTraits(relNode, desiredTraits);
      //3. 通過 VolcanoPlanner 的 setRoot 方法注冊相應的 RelNode,并進行相應的初始化操作
      planner.setRoot(relNode);
      //4. 通過動態規劃算法找到 cost 最小的 plan
      relNode = planner.findBestExp();
      

      Flink 中相關代碼如下:

      public PlannerContext(
      			TableConfig tableConfig,
      			FunctionCatalog functionCatalog,
      			CatalogManager catalogManager,
      			CalciteSchema rootSchema,
      			List<RelTraitDef> traitDefs) {
      		this.tableConfig = tableConfig;
      
      		this.context = new FlinkContextImpl(
      				tableConfig,
      				functionCatalog,
      				catalogManager,
      				this::createSqlExprToRexConverter);
      
      		this.rootSchema = rootSchema;
      		this.traitDefs = traitDefs;
      		// Make a framework config to initialize the RelOptCluster instance,
      		// caution that we can only use the attributes that can not be overwrite/configured
      		// by user.
      		this.frameworkConfig = createFrameworkConfig();
      
          // 這里使用了VolcanoPlanner
      		RelOptPlanner planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), frameworkConfig.getContext());
      		planner.setExecutor(frameworkConfig.getExecutor());
      		for (RelTraitDef traitDef : frameworkConfig.getTraitDefs()) {
      			planner.addRelTraitDef(traitDef);
      		}
      		this.cluster = FlinkRelOptClusterFactory.create(planner, new RexBuilder(typeFactory));
      	}
      
      
      //初始化
      <init>:119, PlannerContext (org.apache.Flink.table.planner.delegation)
      <init>:86, PlannerBase (org.apache.Flink.table.planner.delegation)
      <init>:44, StreamPlanner (org.apache.Flink.table.planner.delegation)
      create:50, BlinkPlannerFactory (org.apache.Flink.table.planner.delegation)
      create:325, StreamTableEnvironmentImpl$ (org.apache.Flink.table.api.scala.internal)
      create:425, StreamTableEnvironment$ (org.apache.Flink.table.api.scala)
      main:56, StreamSQLExample$ (spendreport)
      main:-1, StreamSQLExample (spendreport)
      
      
      class FlinkVolcanoProgram[OC <: FlinkOptimizeContext] extends FlinkRuleSetProgram[OC] {
      
       override def optimize(root: RelNode, context: OC): RelNode = {
          val targetTraits = root.getTraitSet.plusAll(requiredOutputTraits.get).simplify()
          // VolcanoPlanner limits that the planer a RelNode tree belongs to and
          // the VolcanoPlanner used to optimize the RelNode tree should be same instance.
          // see: VolcanoPlanner#registerImpl
          // here, use the planner in cluster directly
            
          // 這里也使用了VolcanoPlanner 
          val planner = root.getCluster.getPlanner.asInstanceOf[VolcanoPlanner]
          val optProgram = Programs.ofRules(rules)
        }
      }      
        
      // 其調用棧
      
      optimize:60, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
      apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
      apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      foreach:891, Iterator$class (scala.collection)
      foreach:1334, AbstractIterator (scala.collection)
      foreach:72, IterableLike$class (scala.collection)
      foreach:54, AbstractIterable (scala.collection)
      foldLeft:157, TraversableOnce$class (scala.collection)
      foldLeft:104, AbstractTraversable (scala.collection)
      optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
      optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
      translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
      toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
      main:89, StreamSQLExample$ (spendreport)
      main:-1, StreamSQLExample (spendreport)
      
      // 下面全部是 VolcanoPlanner 相關代碼和調用棧 
        
      // VolcanoPlanner添加Rule,篩選出來的優化規則會封裝成VolcanoRuleMatch,然后扔到RuleQueue里,而這個RuleQueue正是接下來執行動態規劃算法要用到的核心類。   
      public class VolcanoPlanner extends AbstractRelOptPlanner {
            public boolean addRule(RelOptRule rule) {
              ......
            }
      }    
              
      addRule:438, VolcanoPlanner (org.apache.calcite.plan.volcano)
      run:315, Programs$RuleSetProgram (org.apache.calcite.tools)
      optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
      apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
      apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      foreach:891, Iterator$class (scala.collection)
      foreach:1334, AbstractIterator (scala.collection)
      foreach:72, IterableLike$class (scala.collection)
      foreach:54, AbstractIterable (scala.collection)
      foldLeft:157, TraversableOnce$class (scala.collection)
      foldLeft:104, AbstractTraversable (scala.collection)
      optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
      optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
      translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
      toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
      main:89, StreamSQLExample$ (spendreport)
      main:-1, StreamSQLExample (spendreport)        
       
      // VolcanoPlanner修改Traits   
      public class VolcanoPlanner extends AbstractRelOptPlanner {
          public RelNode changeTraits(RelNode rel, RelTraitSet toTraits) {
              assert !rel.getTraitSet().equals(toTraits);
      
              assert toTraits.allSimple();
      
              RelSubset rel2 = this.ensureRegistered(rel, (RelNode)null);
              return rel2.getTraitSet().equals(toTraits) ? rel2 : rel2.set.getOrCreateSubset(rel.getCluster(), toTraits.simplify());
          }
      }
      
      changeTraits:529, VolcanoPlanner (org.apache.calcite.plan.volcano)
      run:324, Programs$RuleSetProgram (org.apache.calcite.tools)
      optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
      apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
      apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      foreach:891, Iterator$class (scala.collection)
      foreach:1334, AbstractIterator (scala.collection)
      foreach:72, IterableLike$class (scala.collection)
      foreach:54, AbstractIterable (scala.collection)
      foldLeft:157, TraversableOnce$class (scala.collection)
      foldLeft:104, AbstractTraversable (scala.collection)
      optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
      optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
      translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
      toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
      main:89, StreamSQLExample$ (spendreport)
      main:-1, StreamSQLExample (spendreport)
        
      // VolcanoPlanner設定Root 
      public class VolcanoPlanner extends AbstractRelOptPlanner {  
          public void setRoot(RelNode rel) {
              this.registerMetadataRels();
              this.root = this.registerImpl(rel, (RelSet)null);
              if (this.originalRoot == null) {
                  this.originalRoot = rel;
              }
      
              this.ruleQueue.recompute(this.root);
              this.ensureRootConverters();
          }
      }
      
      setRoot:294, VolcanoPlanner (org.apache.calcite.plan.volcano)
      run:326, Programs$RuleSetProgram (org.apache.calcite.tools)
      optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
      apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
      apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      foreach:891, Iterator$class (scala.collection)
      foreach:1334, AbstractIterator (scala.collection)
      foreach:72, IterableLike$class (scala.collection)
      foreach:54, AbstractIterable (scala.collection)
      foldLeft:157, TraversableOnce$class (scala.collection)
      foldLeft:104, AbstractTraversable (scala.collection)
      optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
      optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
      translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
      toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
      main:89, StreamSQLExample$ (spendreport)
      main:-1, StreamSQLExample (spendreport)
        
      // VolcanoPlanner找到最小cost,本質上就是一個動態規劃算法的實現。
        
      public class VolcanoPlanner extends AbstractRelOptPlanner {
          public RelNode findBestExp() {
              this.ensureRootConverters();
              this.registerMaterializations();
              int cumulativeTicks = 0;
              VolcanoPlannerPhase[] var2 = VolcanoPlannerPhase.values();
              int var3 = var2.length;
      
              for(int var4 = 0; var4 < var3; ++var4) {
                  VolcanoPlannerPhase phase = var2[var4];
                  this.setInitialImportance();
                  RelOptCost targetCost = this.costFactory.makeHugeCost();
                  int tick = 0;
                  int firstFiniteTick = -1;
                  int splitCount = 0;
                  int giveUpTick = 2147483647;
      
                  while(true) {
                      ++tick;
                      ++cumulativeTicks;
                      if (this.root.bestCost.isLe(targetCost)) {
                          if (firstFiniteTick < 0) {
                              firstFiniteTick = cumulativeTicks;
                              this.clearImportanceBoost();
                          }
      
                          if (!this.ambitious) {
                              break;
                          }
      
                          targetCost = this.root.bestCost.multiplyBy(0.9D);
                          ++splitCount;
                          if (this.impatient) {
                              if (firstFiniteTick < 10) {
                                  giveUpTick = cumulativeTicks + 25;
                              } else {
                                  giveUpTick = cumulativeTicks + Math.max(firstFiniteTick / 10, 25);
                              }
                          }
                      } else {
                          if (cumulativeTicks > giveUpTick) {
                              break;
                          }
      
                          if (this.root.bestCost.isInfinite() && tick % 10 == 0) {
                              this.injectImportanceBoost();
                          }
                      }
      
                      VolcanoRuleMatch match = this.ruleQueue.popMatch(phase);
                      if (match == null) {
                          break;
                      }
      
                      assert match.getRule().matches(match);
      
                      match.onMatch();
                      this.root = this.canonize(this.root);
                  }
      
                  this.ruleQueue.phaseCompleted(phase);
              }
      
              RelNode cheapest = this.root.buildCheapestPlan(this);
      
              return cheapest;
          }
      }
      
      // VolcanoPlanner得到的Flink邏輯節點 cheapest,就是最終選擇的結點
      
      cheapest = {FlinkLogicalUnion@6487} "FlinkLogicalUnion#443"
       cluster = {FlinkRelOptCluster@6224} 
       inputs = {RegularImmutableList@6493}  size = 2
        0 = {FlinkLogicalCalc@6498} "FlinkLogicalCalc#441"
         cluster = {FlinkRelOptCluster@6224} 
         calcProgram = {RexProgram@6509} "(expr#0..2=[{inputs}], expr#3=[2], expr#4=[>($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])"
         program = {RexProgram@6509} "(expr#0..2=[{inputs}], expr#3=[2], expr#4=[>($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])"
         input = {FlinkLogicalDataStreamTableScan@6510} "rel#437:FlinkLogicalDataStreamTableScan.LOGICAL.any.None: 0.false.UNKNOWN(table=[default_catalog, default_database, UnnamedTable$0])"
         desc = "FlinkLogicalCalc#441"
         rowType = {RelRecordType@6504} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)"
         digest = "FlinkLogicalCalc#441"
         AbstractRelNode.cluster = {FlinkRelOptCluster@6224} 
         id = 441
         traitSet = {RelTraitSet@5942}  size = 5
        1 = {FlinkLogicalCalc@6499} "FlinkLogicalCalc#442"
         cluster = {FlinkRelOptCluster@6224} 
         calcProgram = {RexProgram@6502} "(expr#0..2=[{inputs}], expr#3=[2], expr#4=[<($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])"
         program = {RexProgram@6502} "(expr#0..2=[{inputs}], expr#3=[2], expr#4=[<($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])"
         input = {FlinkLogicalDataStreamTableScan@6503} "rel#435:FlinkLogicalDataStreamTableScan.LOGICAL.any.None: 0.false.UNKNOWN(table=[default_catalog, default_database, OrderB])"
         desc = "FlinkLogicalCalc#442"
         rowType = {RelRecordType@6504} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)"
         digest = "FlinkLogicalCalc#442"
         AbstractRelNode.cluster = {FlinkRelOptCluster@6224} 
         id = 442
         traitSet = {RelTraitSet@5942}  size = 5
       kind = {SqlKind@6494} "UNION"
        lowerName = "union"
        sql = "UNION"
        name = "UNION"
        ordinal = 18
       all = true
       desc = "FlinkLogicalUnion#443"
       rowType = null
       digest = "FlinkLogicalUnion#443"
       AbstractRelNode.cluster = {FlinkRelOptCluster@6224} 
       id = 443
       traitSet = {RelTraitSet@5942}  size = 5
      
      
      findBestExp:572, VolcanoPlanner (org.apache.calcite.plan.volcano)
      run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
      optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
      apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
      apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      foreach:891, Iterator$class (scala.collection)
      foreach:1334, AbstractIterator (scala.collection)
      foreach:72, IterableLike$class (scala.collection)
      foreach:54, AbstractIterable (scala.collection)
      foldLeft:157, TraversableOnce$class (scala.collection)
      foldLeft:104, AbstractTraversable (scala.collection)
      optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
      optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
      translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
      toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
      main:89, StreamSQLExample$ (spendreport)
      main:-1, StreamSQLExample (spendreport)
      

      以下是Join的優化

      class FlinkLogicalJoin(
          cluster: RelOptCluster,
          traitSet: RelTraitSet,
          left: RelNode,
          right: RelNode,
          condition: RexNode,
          joinType: JoinRelType)
        extends FlinkLogicalJoinBase(
      
        override def convert(rel: RelNode): RelNode = {
          val join = rel.asInstanceOf[LogicalJoin]
          val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
          val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)
          val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)
      
          new FlinkLogicalJoin(
            rel.getCluster,
            traitSet,
            newLeft,
            newRight,
            join.getCondition,
            join.getJoinType)
        }
      }  
      
      call = {VolcanoRuleMatch@6191} "rule [FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)] rels [rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)]"
       targetSet = {RelSet@6193} 
       targetSubset = null
       digest = "rule [FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)] rels [rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)]"
       cachedImportance = 0.8019000000000001
       volcanoPlanner = {VolcanoPlanner@6194} 
       generatedRelList = null
       id = 71
       operand0 = {RelOptRule$ConverterRelOptRuleOperand@6186} 
        parent = null
        rule = {FlinkLogicalJoinConverter@6179} "FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)"
        predicate = {ConverterRule$lambda@6246} 
        solveOrder = {int[1]@6247} 
        ordinalInParent = 0
        ordinalInRule = 0
        trait = {Convention$Impl@6184} "NONE"
        clazz = {Class@5010} "class org.apache.calcite.rel.logical.LogicalJoin"
        children = {RegularImmutableList@6230}  size = 0
        childPolicy = {RelOptRuleOperandChildPolicy@6248} "ANY"
       nodeInputs = {RegularImmutableBiMap@6195}  size = 0
       rule = {FlinkLogicalJoinConverter@6179} "FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)"
       rels = {RelNode[1]@6196} 
        0 = {LogicalJoin@6181} "rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)"
         semiJoinDone = false
         systemFieldList = {RegularImmutableList@6230}  size = 0
         condition = {RexCall@6231} "=($0, $2)"
         variablesSet = {RegularImmutableSet@6232}  size = 0
         joinType = {JoinRelType@6233} "LEFT"
         joinInfo = {JoinInfo@6234} 
         left = {RelSubset@6235} "rel#98:Subset#0.NONE.any.None: 0.false.UNKNOWN"
         right = {RelSubset@6236} "rel#99:Subset#1.NONE.any.None: 0.false.UNKNOWN"
         desc = "rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)"
         rowType = {RelRecordType@6237} "RecordType(VARCHAR(2147483647) orderId, VARCHAR(2147483647) productName, VARCHAR(2147483647) orderId0, VARCHAR(2147483647) payType)"
         digest = "LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)"
         cluster = {FlinkRelOptCluster@6239} 
         id = 100
         traitSet = {RelTraitSet@6240}  size = 5
       planner = {VolcanoPlanner@6194} 
       parents = null  
        
      // 生成時候的調用棧   
         
      create:106, FlinkLogicalJoin$ (org.apache.flink.table.planner.plan.nodes.logical)
      convert:92, FlinkLogicalJoinConverter (org.apache.flink.table.planner.plan.nodes.logical)
      onMatch:144, ConverterRule (org.apache.calcite.rel.convert)
      onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
      findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
      run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
      optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program)
      apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
      apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      foreach:891, Iterator$class (scala.collection)
      foreach:1334, AbstractIterator (scala.collection)
      foreach:72, IterableLike$class (scala.collection)
      foreach:54, AbstractIterable (scala.collection)
      foldLeft:157, TraversableOnce$class (scala.collection)
      foldLeft:104, AbstractTraversable (scala.collection)
      optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program)
      optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
      doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
      optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
      optimize:248, PlannerBase (org.apache.flink.table.planner.delegation)
      translate:151, PlannerBase (org.apache.flink.table.planner.delegation)
      toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toRetractStream:146, TableConversions (org.apache.flink.table.api.scala)
      main:75, SimpleOuterJoin$ (spendreport)
      main:-1, SimpleOuterJoin (spendreport)
        
      abstract class FlinkLogicalJoinBase(  
          cluster: RelOptCluster,
          traitSet: RelTraitSet,
          left: RelNode,
          right: RelNode,
          condition: RexNode,
          joinType: JoinRelType)
        extends Join(
          cluster,
          traitSet,
          left,
          right,
          condition,
          Set.empty[CorrelationId].asJava,
          joinType)
        with FlinkLogicalRel {
          
        // 這里也會計算cost
        override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
          val leftRowCnt = mq.getRowCount(getLeft)
          val leftRowSize = mq.getAverageRowSize(getLeft)
          val rightRowCnt = mq.getRowCount(getRight)
      
          joinType match {
            case JoinRelType.SEMI | JoinRelType.ANTI =>
              val rightRowSize = mq.getAverageRowSize(getRight)
              val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
              val cpuCost = leftRowCnt + rightRowCnt
              val rowCnt = leftRowCnt + rightRowCnt
              planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
            case _ =>
              val cpuCost = leftRowCnt + rightRowCnt
              val ioCost = (leftRowCnt * leftRowSize) + rightRowCnt
              planner.getCostFactory.makeCost(leftRowCnt, cpuCost, ioCost)
          }
        }  
      }  
        
      // 調用棧    
        
      computeSelfCost:63, FlinkLogicalJoin (org.apache.flink.table.planner.plan.nodes.logical)
      getNonCumulativeCost:41, FlinkRelMdNonCumulativeCost (org.apache.flink.table.planner.plan.metadata)
      getNonCumulativeCost_$:-1, GeneratedMetadataHandler_NonCumulativeCost
      getNonCumulativeCost:-1, GeneratedMetadataHandler_NonCumulativeCost
      getNonCumulativeCost:301, RelMetadataQuery (org.apache.calcite.rel.metadata)
      getCost:936, VolcanoPlanner (org.apache.calcite.plan.volcano)
      propagateCostImprovements0:347, RelSubset (org.apache.calcite.plan.volcano)
      propagateCostImprovements:330, RelSubset (org.apache.calcite.plan.volcano)
      addRelToSet:1828, VolcanoPlanner (org.apache.calcite.plan.volcano)
      registerImpl:1764, VolcanoPlanner (org.apache.calcite.plan.volcano)
      register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
      ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
      ensureRegistered:1939, VolcanoPlanner (org.apache.calcite.plan.volcano)
      transformTo:129, VolcanoRuleCall (org.apache.calcite.plan.volcano)
      transformTo:236, RelOptRuleCall (org.apache.calcite.plan)
      onMatch:146, ConverterRule (org.apache.calcite.rel.convert)
      onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
      findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
      run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
      optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program)
      apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
      apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      foreach:891, Iterator$class (scala.collection)
      foreach:1334, AbstractIterator (scala.collection)
      foreach:72, IterableLike$class (scala.collection)
      foreach:54, AbstractIterable (scala.collection)
      foldLeft:157, TraversableOnce$class (scala.collection)
      foldLeft:104, AbstractTraversable (scala.collection)
      optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program)
      optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
      doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
      optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
      optimize:248, PlannerBase (org.apache.flink.table.planner.delegation)
      translate:151, PlannerBase (org.apache.flink.table.planner.delegation)
      toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toRetractStream:146, TableConversions (org.apache.flink.table.api.scala)
      main:75, SimpleOuterJoin$ (spendreport)
      main:-1, SimpleOuterJoin (spendreport)  
      

      優化規則

      Calcite 會基于優化規則來優化這些 Logical Plan,根據運行環境的不同會應用不同的優化規則(Flink提供了批的優化規則 和 流的優化規則)。

      優化規則分為兩類,一類是Calcite提供的內置優化規則(如條件下推,剪枝等),另一類是是將Logical Node轉變成 Flink Node 的規則。

      這兩步驟都屬于 Calcite 的優化階段。得到的 DataStream Plan 封裝了如何將節點翻譯成對應 DataStream / DataSet 程序的邏輯。其步驟就是將不同的 DataStream/DataSet Node 通過代碼生成(CodeGen)翻譯成最終可執行的 DataStream/DataSet 程序。

      下面就列舉了不同的Rule,每條規則會對應生成一個物理節點。比如節點內根據Calcite生成的sql的執行步驟,會進行codegen出DataSet的執行Function代碼,

      package org.apache.Flink.table.plan.rules
        
        /**
          * RuleSet to optimize plans for batch / DataSet execution
          */
        val DATASET_OPT_RULES: RuleSet = RuleSets.ofList(
          // translate to Flink DataSet nodes
          DataSetWindowAggregateRule.INSTANCE,
          DataSetAggregateRule.INSTANCE,
          DataSetDistinctRule.INSTANCE,
          DataSetCalcRule.INSTANCE,
          DataSetPythonCalcRule.INSTANCE,
          DataSetJoinRule.INSTANCE,
          DataSetSingleRowJoinRule.INSTANCE,
          DataSetScanRule.INSTANCE,
          DataSetUnionRule.INSTANCE,
          DataSetIntersectRule.INSTANCE,
          DataSetMinusRule.INSTANCE,
          DataSetSortRule.INSTANCE,
          DataSetValuesRule.INSTANCE,
          DataSetCorrelateRule.INSTANCE,
          BatchTableSourceScanRule.INSTANCE
        )
        
         /**
          * RuleSet to optimize plans for stream / DataStream execution
          */
        val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
          // translate to DataStream nodes
          DataStreamSortRule.INSTANCE,
          DataStreamGroupAggregateRule.INSTANCE,
          DataStreamOverAggregateRule.INSTANCE,
          DataStreamGroupWindowAggregateRule.INSTANCE,
          DataStreamCalcRule.INSTANCE,
          DataStreamScanRule.INSTANCE,
          DataStreamUnionRule.INSTANCE,
          DataStreamValuesRule.INSTANCE,
          DataStreamCorrelateRule.INSTANCE,
          DataStreamWindowJoinRule.INSTANCE,
          DataStreamJoinRule.INSTANCE,
          DataStreamTemporalTableJoinRule.INSTANCE,
          StreamTableSourceScanRule.INSTANCE,
          DataStreamMatchRule.INSTANCE,
          DataStreamTableAggregateRule.INSTANCE,
          DataStreamGroupWindowTableAggregateRule.INSTANCE,
          DataStreamPythonCalcRule.INSTANCE
        )
          
      package org.apache.Flink.table.planner.plan.rules
      
        /**
          * RuleSet to do physical optimize for stream
          */
        val PHYSICAL_OPT_RULES: RuleSet = RuleSets.ofList(
          FlinkExpandConversionRule.STREAM_INSTANCE,
          // source
          StreamExecDataStreamScanRule.INSTANCE,
          StreamExecTableSourceScanRule.INSTANCE,
          StreamExecIntermediateTableScanRule.INSTANCE,
          StreamExecWatermarkAssignerRule.INSTANCE,
          StreamExecValuesRule.INSTANCE,
          // calc
          StreamExecCalcRule.INSTANCE,
          StreamExecPythonCalcRule.INSTANCE,
          // union
          StreamExecUnionRule.INSTANCE,
          // sort
          StreamExecSortRule.INSTANCE,
          StreamExecLimitRule.INSTANCE,
          StreamExecSortLimitRule.INSTANCE,
          StreamExecTemporalSortRule.INSTANCE,
          // rank
          StreamExecRankRule.INSTANCE,
          StreamExecDeduplicateRule.RANK_INSTANCE,
          // expand
          StreamExecExpandRule.INSTANCE,
          // group agg
          StreamExecGroupAggregateRule.INSTANCE,
          StreamExecGroupTableAggregateRule.INSTANCE,
          // over agg
          StreamExecOverAggregateRule.INSTANCE,
          // window agg
          StreamExecGroupWindowAggregateRule.INSTANCE,
          StreamExecGroupWindowTableAggregateRule.INSTANCE,
          // join
          StreamExecJoinRule.INSTANCE,
          StreamExecWindowJoinRule.INSTANCE,
          StreamExecTemporalJoinRule.INSTANCE,
          StreamExecLookupJoinRule.SNAPSHOT_ON_TABLESCAN,
          StreamExecLookupJoinRule.SNAPSHOT_ON_CALC_TABLESCAN,
          // CEP
          StreamExecMatchRule.INSTANCE,
          // correlate
          StreamExecConstantTableFunctionScanRule.INSTANCE,
          StreamExecCorrelateRule.INSTANCE,
          // sink
          StreamExecSinkRule.INSTANCE
        )
      
      StreamExecUnionRule

      一個具體的Rule舉例 ,這里是 Union 的 Rule :

      package org.apache.Flink.table.planner.plan.rules.physical.stream
      
      class StreamExecUnionRule
        extends ConverterRule(
          classOf[FlinkLogicalUnion],
          FlinkConventions.LOGICAL,
          FlinkConventions.STREAM_PHYSICAL,
          "StreamExecUnionRule") {
        
        def convert(rel: RelNode): RelNode = {
          val union: FlinkLogicalUnion = rel.asInstanceOf[FlinkLogicalUnion]
          val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
          val newInputs = union.getInputs.map(RelOptRule.convert(_, FlinkConventions.STREAM_PHYSICAL))
      
          // 這里本條規則會對應生成一個物理節點。節點內根據Calcite生成的sql的執行步驟,會進行codegen出Stream的執行Function代碼,
          new StreamExecUnion(
            rel.getCluster,
            traitSet,
            newInputs,
            union.all,
            rel.getRowType)
          }
        }  
      } 
      
      public class VolcanoPlanner extends AbstractRelOptPlanner {
          public RelNode findBestExp() {
                   // 在這里會對Rule進行匹配調用
                   match.onMatch();
              return cheapest;
          }
      }
      
      match = {VolcanoRuleMatch@6252} "rule [StreamExecUnionRule(in:LOGICAL,out:STREAM_PHYSICAL)] rels [rel#215:FlinkLogicalUnion.LOGICAL.any.None: 0.false.UNKNOWN(input#0=RelSubset#211,input#1=RelSubset#214,all=true)]"
       targetSet = {RelSet@6298} 
       targetSubset = null
       digest = "rule [StreamExecUnionRule(in:LOGICAL,out:STREAM_PHYSICAL)] rels [rel#215:FlinkLogicalUnion.LOGICAL.any.None: 0.false.UNKNOWN(input#0=RelSubset#211,input#1=RelSubset#214,all=true)]"
       cachedImportance = 0.81
       volcanoPlanner = {VolcanoPlanner@6259} 
       generatedRelList = null
       id = 521
       operand0 = {RelOptRule$ConverterRelOptRuleOperand@6247} 
       nodeInputs = {RegularImmutableBiMap@6299}  size = 0
       rule = {StreamExecUnionRule@6241} "StreamExecUnionRule(in:LOGICAL,out:STREAM_PHYSICAL)"
       rels = {RelNode[1]@6300} 
       planner = {VolcanoPlanner@6259} 
       parents = null
      
      // 調用棧       
         
      create:106, FlinkLogicalJoin$ (org.apache.flink.table.planner.plan.nodes.logical)
      convert:92, FlinkLogicalJoinConverter (org.apache.flink.table.planner.plan.nodes.logical)
      onMatch:144, ConverterRule (org.apache.calcite.rel.convert)
      onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
      findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
      run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
      optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program)
      apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
      apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      foreach:891, Iterator$class (scala.collection)
      foreach:1334, AbstractIterator (scala.collection)
      foreach:72, IterableLike$class (scala.collection)
      foreach:54, AbstractIterable (scala.collection)
      foldLeft:157, TraversableOnce$class (scala.collection)
      foldLeft:104, AbstractTraversable (scala.collection)
      optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program)
      optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
      doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
      optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
      optimize:248, PlannerBase (org.apache.flink.table.planner.delegation)
      translate:151, PlannerBase (org.apache.flink.table.planner.delegation)
      toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toRetractStream:146, TableConversions (org.apache.flink.table.api.scala)
      main:75, SimpleOuterJoin$ (spendreport)
      main:-1, SimpleOuterJoin (spendreport)
         
        
      // 調用棧      
        
      convert:46, StreamExecUnionRule (org.apache.Flink.table.planner.plan.rules.physical.stream)
      onMatch:144, ConverterRule (org.apache.calcite.rel.convert)
      onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
      findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
      run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
      optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
      apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
      apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      foreach:891, Iterator$class (scala.collection)
      foreach:1334, AbstractIterator (scala.collection)
      foreach:72, IterableLike$class (scala.collection)
      foreach:54, AbstractIterable (scala.collection)
      foldLeft:157, TraversableOnce$class (scala.collection)
      foldLeft:104, AbstractTraversable (scala.collection)
      optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
      optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
      translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
      toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
      main:89, StreamSQLExample$ (spendreport)
      main:-1, StreamSQLExample (spendreport)
      
      StreamExecJoinRule

      另一個具體的Rule舉例 ,這里是 Join的優化,StreamExecJoin的生成

      class StreamExecJoinRule {
          override def onMatch(call: RelOptRuleCall): Unit = {
            val newJoin = new StreamExecJoin(
              join.getCluster,
              providedTraitSet,
              newLeft,
              newRight,
              join.getCondition,
              join.getJoinType)
            call.transformTo(newJoin)
          }
      }
      
      newJoin = {StreamExecJoin@6326} "StreamExecJoin#152"
       cluster = {FlinkRelOptCluster@5072} 
       joinType = {JoinRelType@5038} "LEFT"
       LOG = null
       transformation = null
       bitmap$trans$0 = false
       CommonPhysicalJoin.joinType = {JoinRelType@5038} "LEFT"
       filterNulls = null
       keyPairs = null
       flinkJoinType = null
       inputRowType = null
       bitmap$0 = 0
       condition = {RexCall@5041} "=($0, $2)"
       variablesSet = {RegularImmutableSet@6342}  size = 0
       Join.joinType = {JoinRelType@5038} "LEFT"
       joinInfo = {JoinInfo@6343} 
       left = {RelSubset@6328} "rel#150:Subset#5.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
        bestCost = {FlinkCost$$anon$1@6344} "{inf}"
        set = {RelSet@6348} 
        best = null
        timestamp = 0
        boosted = false
        desc = "rel#150:Subset#5.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
        rowType = {RelRecordType@6349} "RecordType(VARCHAR(2147483647) orderId, VARCHAR(2147483647) productName)"
        digest = "Subset#5.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
        cluster = {FlinkRelOptCluster@5072} 
        id = 150
        traitSet = {RelTraitSet@6336}  size = 5
       right = {RelSubset@6329} "rel#151:Subset#6.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
        bestCost = {FlinkCost$$anon$1@6344} "{inf}"
        set = {RelSet@6345} 
        best = null
        timestamp = 0
        boosted = false
        desc = "rel#151:Subset#6.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
        rowType = null
        digest = "Subset#6.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
        cluster = {FlinkRelOptCluster@5072} 
        id = 151
        traitSet = {RelTraitSet@6336}  size = 5
       desc = "StreamExecJoin#152"
       rowType = null
       digest = "StreamExecJoin#152"
       AbstractRelNode.cluster = {FlinkRelOptCluster@5072} 
       id = 152
       traitSet = {RelTraitSet@6327}  size = 5
      
      // 調用棧       
         
      <init>:58, StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
      onMatch:128, StreamExecJoinRule (org.apache.flink.table.planner.plan.rules.physical.stream)
      onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
      findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
      run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
      optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program)
      apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
      apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      foreach:891, Iterator$class (scala.collection)
      foreach:1334, AbstractIterator (scala.collection)
      foreach:72, IterableLike$class (scala.collection)
      foreach:54, AbstractIterable (scala.collection)
      foldLeft:157, TraversableOnce$class (scala.collection)
      foldLeft:104, AbstractTraversable (scala.collection)
      optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program)
      optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
      doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
      optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
      optimize:248, PlannerBase (org.apache.flink.table.planner.delegation)
      translate:151, PlannerBase (org.apache.flink.table.planner.delegation)
      toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toRetractStream:146, TableConversions (org.apache.flink.table.api.scala)
      main:75, SimpleOuterJoin$ (spendreport)
      main:-1, SimpleOuterJoin (spendreport)
      

      5. 生成ExecutionPlan

      這時候脈絡圖如下

      // NOTE : 執行順序是從上至下," -----> " 表示生成的實例類型
      * 
      *        +-----> "left outer JOIN" (SQL statement)
      *        |   
      *        |     
      *     SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),作用是SQL–>SqlNode      
      *        |   
      *        |      
      *        +-----> SqlJoin (SqlNode)
      *        |   
      *        |     
      *     SqlToRelConverter.convertQuery // 語義分析,生成邏輯計劃,作用是SqlNode–>RelNode
      *        |    
      *        |     
      *        +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未優化的RelNode   
      *        |      
      *        |     
      *    FlinkLogicalJoinConverter (RelOptRule) // Flink定制的優化rules      
      *    VolcanoRuleCall.onMatch // 基于Flink定制的一些優化rules去優化 Logical Plan 
      *        | 
      *        |   
      *        +-----> FlinkLogicalJoin (RelNode)  // Optimized Logical Plan,邏輯執行計劃
      *        |  
      *        |    
      *    StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin     
      *    VolcanoRuleCall.onMatch // 基于Flink rules將optimized LogicalPlan轉成Flink物理執行計劃
      *        |       
      *        |   
      *        +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理執行計劃
      *        |      
      *        |   
      *    StreamExecJoin.translateToPlanInternal  // 作用是生成 StreamOperator, 即Flink算子  
      *        |     
      *        |     
      *        +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask 
      *        |     
      *        |  
      

      Calcite 針對不同的大數據組件,將優化后的plan映射到最終的大數據引擎,如折射成Flink圖。

      這一塊只要是遞歸調用各個節點DataStreamRel的translateToPlan方法,這個方法利用CodeGen元編程成Flink的各種算子。現在就相當于我們直接利用Flink的DataSet或DataStream API開發的程序。

      class StreamPlanner(
          executor: Executor,
          config: TableConfig,
          functionCatalog: FunctionCatalog,
          catalogManager: CatalogManager)
        extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = true) {
        override protected def translateToPlan(
            execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = {
          execNodes.map {
            case node: StreamExecNode[_] => node.translateToPlan(this)
            case _ =>
              throw new TableException("Cannot generate DataStream due to an invalid logical plan. " +
                "This is a bug and should not happen. Please file an issue.")
          }
        }
      }
      
      package org.apache.Flink.table.planner.plan.nodes.physical.stream	
      
      class StreamExecUnion(
          cluster: RelOptCluster,
          traitSet: RelTraitSet,
          inputRels: util.List[RelNode],
          all: Boolean,
          outputRowType: RelDataType)
        extends Union(cluster, traitSet, inputRels, all)
        with StreamPhysicalRel
        with StreamExecNode[BaseRow] {
      
        // 這里就生成了Flink算子
        override protected def translateToPlanInternal(
            planner: StreamPlanner): Transformation[BaseRow] = {
          val transformations = getInputNodes.map {
            input => input.translateToPlan(planner).asInstanceOf[Transformation[BaseRow]]
          }
          new UnionTransformation(transformations)
        }
      }
      
       // 調用棧
      
      translateToPlanInternal:85, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
      translateToPlanInternal:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
      translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec)
      translateToPlan:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
      translateToTransformation:184, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
      translateToPlanInternal:153, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
      translateToPlanInternal:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
      translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec)
      translateToPlan:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
      apply:60, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation)
      apply:59, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation)
      apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
      apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
      foreach:891, Iterator$class (scala.collection)
      foreach:1334, AbstractIterator (scala.collection)
      foreach:72, IterableLike$class (scala.collection)
      foreach:54, AbstractIterable (scala.collection)
      map:234, TraversableLike$class (scala.collection)
      map:104, AbstractTraversable (scala.collection)
      translateToPlan:59, StreamPlanner (org.apache.Flink.table.planner.delegation)
      translate:153, PlannerBase (org.apache.Flink.table.planner.delegation)
      toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
      main:89, StreamSQLExample$ (spendreport)
      main:-1, StreamSQLExample (spendreport)
      

      6. 運行時

      此時脈絡圖補充完全。

      // NOTE : 執行順序是從上至下," -----> " 表示生成的實例類型
      * 
      *        +-----> "left outer JOIN" (SQL statement)
      *        |   
      *        |     
      *     SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),作用是SQL–>SqlNode      
      *        |   
      *        |      
      *        +-----> SqlJoin (SqlNode)
      *        |   
      *        |     
      *     SqlToRelConverter.convertQuery // 語義分析,生成邏輯計劃,作用是SqlNode–>RelNode
      *        |    
      *        |     
      *        +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未優化的RelNode   
      *        |      
      *        |     
      *    FlinkLogicalJoinConverter (RelOptRule) // Flink定制的優化rules      
      *    VolcanoRuleCall.onMatch // 基于Flink定制的一些優化rules去優化 Logical Plan 
      *        | 
      *        |   
      *        +-----> FlinkLogicalJoin (RelNode)  // Optimized Logical Plan,邏輯執行計劃
      *        |  
      *        |    
      *    StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin     
      *    VolcanoRuleCall.onMatch // 基于Flink rules將optimized LogicalPlan轉成Flink物理執行計劃
      *        |       
      *        |   
      *        +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理執行計劃
      *        |      
      *        |     
      *    StreamExecJoin.translateToPlanInternal  // 作用是生成 StreamOperator, 即Flink算子  
      *        |     
      *        |     
      *        +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask 
      *        |     
      *        |       
      *    StreamTwoInputProcessor.processRecord1// 在TwoInputStreamTask調用StreamingJoinOperator,真實的執行  
      *        |
      *        |  
      

      運行時候,則會在StreamTask中進行業務操作,這就是我們熟悉的操作了。調用棧舉例如下

      processElement:150, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
      emitNext:128, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
      processInput:69, StreamOneInputProcessor (org.apache.Flink.streaming.runtime.io)
      processInput:311, StreamTask (org.apache.Flink.streaming.runtime.tasks)
      runDefaultAction:-1, 354713989 (org.apache.Flink.streaming.runtime.tasks.StreamTask$$Lambda$710)
      runMailboxLoop:187, MailboxProcessor (org.apache.Flink.streaming.runtime.tasks.mailbox)
      runMailboxLoop:487, StreamTask (org.apache.Flink.streaming.runtime.tasks)
      invoke:470, StreamTask (org.apache.Flink.streaming.runtime.tasks)
      doRun:707, Task (org.apache.Flink.runtime.taskmanager)
      run:532, Task (org.apache.Flink.runtime.taskmanager)
      run:748, Thread (java.lang)
      

      0x05 代碼實例 UNION

      下面是如何具體生成各種執行計劃的代碼

      import org.apache.Flink.api.java.utils.ParameterTool
      import org.apache.Flink.api.scala._
      import org.apache.Flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
      import org.apache.Flink.table.api.EnvironmentSettings
      import org.apache.Flink.table.api.scala._
      
      object StreamSQLExample {
      
        // *************************************************************************
        //     PROGRAM
        // *************************************************************************
        def main(args: Array[String]): Unit = {
      
          val params = ParameterTool.fromArgs(args)
          val planner = if (params.has("planner")) params.get("planner") else "Flink"
      
          // set up execution environment
          val env = StreamExecutionEnvironment.getExecutionEnvironment
          val tEnv = if (planner == "blink") {  // use blink planner in streaming mode
            val settings = EnvironmentSettings.newInstance()
              .useBlinkPlanner()
              .inStreamingMode()
              .build()
            StreamTableEnvironment.create(env, settings)
          } else if (planner == "Flink") {  // use Flink planner in streaming mode
            StreamTableEnvironment.create(env)
          } else {
            System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>', " +
              "where planner (it is either Flink or blink, and the default is Flink) indicates whether the " +
              "example uses Flink planner or blink planner.")
            return
          }
      
          val orderA: DataStream[Order] = env.fromCollection(Seq(
            Order(1L, "beer", 3),
            Order(1L, "diaper", 4),
            Order(3L, "rubber", 2)))
      
          val orderB: DataStream[Order] = env.fromCollection(Seq(
            Order(2L, "pen", 3),
            Order(2L, "rubber", 3),
            Order(4L, "beer", 1)))
      
          // convert DataStream to Table
          val tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
          // register DataStream as Table
          tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
      
          // union the two tables
          val result = tEnv.sqlQuery(
            s"""
               |SELECT * FROM $tableA WHERE amount > 2
               |UNION ALL
               |SELECT * FROM OrderB WHERE amount < 2
              """.stripMargin)
      
          result.toAppendStream[Order].print()
          print(tEnv.explain(result))
          env.execute()
        }
      
        // *************************************************************************
        //     USER DATA TYPES
        // *************************************************************************
        case class Order(user: Long, product: String, amount: Int)
      }
      

      整個流程的轉換大體就像這樣:

      == Abstract Syntax Tree ==
      LogicalUnion(all=[true])
      :- LogicalProject(user=[$0], product=[$1], amount=[$2])
      :  +- LogicalFilter(condition=[>($2, 2)])
      :     +- LogicalTableScan(table=[[default_catalog, default_database, UnnamedTable$0]])
      +- LogicalProject(user=[$0], product=[$1], amount=[$2])
         +- LogicalFilter(condition=[<($2, 2)])
            +- LogicalTableScan(table=[[default_catalog, default_database, OrderB]])
      
      == Optimized Logical Plan ==
      Union(all=[true], union=[user, product, amount])
      :- Calc(select=[user, product, amount], where=[>(amount, 2)])
      :  +- DataStreamScan(table=[[default_catalog, default_database, UnnamedTable$0]], fields=[user, product, amount])
      +- Calc(select=[user, product, amount], where=[<(amount, 2)])
         +- DataStreamScan(table=[[default_catalog, default_database, OrderB]], fields=[user, product, amount])
      
      == Physical Execution Plan ==
      Stage 1 : Data Source
      	content : Source: Collection Source
      
      Stage 2 : Data Source
      	content : Source: Collection Source
      
      	Stage 10 : Operator
      		content : SourceConversion(table=[default_catalog.default_database.UnnamedTable$0], fields=[user, product, amount])
      		ship_strategy : FORWARD
      
      		Stage 11 : Operator
      			content : Calc(select=[user, product, amount], where=[(amount > 2)])
      			ship_strategy : FORWARD
      
      			Stage 12 : Operator
      				content : SourceConversion(table=[default_catalog.default_database.OrderB], fields=[user, product, amount])
      				ship_strategy : FORWARD
      
      				Stage 13 : Operator
      					content : Calc(select=[user, product, amount], where=[(amount < 2)])
      					ship_strategy : FORWARD
      
      

      0x06 代碼實例 OUTER JOIN

      import java.sql.Timestamp
      import org.apache.Flink.api.java.utils.ParameterTool
      import org.apache.Flink.api.scala._
      import org.apache.Flink.streaming.api.TimeCharacteristic
      import org.apache.Flink.streaming.api.scala.StreamExecutionEnvironment
      import org.apache.Flink.table.api.{EnvironmentSettings, TableEnvironment}
      import org.apache.Flink.table.api.scala._
      import org.apache.Flink.types.Row
      
      import scala.collection.mutable
      
      object SimpleOuterJoin {
        def main(args: Array[String]): Unit = {
      
          val params = ParameterTool.fromArgs(args)
          val planner = if (params.has("planner")) params.get("planner") else "Flink"
      
          val env = StreamExecutionEnvironment.getExecutionEnvironment
      
          val tEnv = if (planner == "blink") {  // use blink planner in streaming mode
            val settings = EnvironmentSettings.newInstance()
              .useBlinkPlanner()
              .inStreamingMode()
              .build()
            StreamTableEnvironment.create(env, settings)
          } else if (planner == "Flink") {  // use Flink planner in streaming mode
            StreamTableEnvironment.create(env)
          } else {
            System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>', " +
              "where planner (it is either Flink or blink, and the default is Flink) indicates whether the " +
              "example uses Flink planner or blink planner.")
            return
          }
      
          env.setParallelism(1)
          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
          // 構造訂單數據
          val ordersData = new mutable.MutableList[(String, String)]
          ordersData.+=(("001", "iphone"))
          ordersData.+=(("002", "mac"))
          ordersData.+=(("003", "book"))
          ordersData.+=(("004", "cup"))
      
          // 構造付款表
          val paymentData = new mutable.MutableList[(String, String)]
          paymentData.+=(("001", "alipay"))
          paymentData.+=(("002", "card"))
          paymentData.+=(("003", "card"))
          paymentData.+=(("004", "alipay"))
          val orders = env
            .fromCollection(ordersData)
             .toTable(tEnv, 'orderId, 'productName)
          val ratesHistory = env
            .fromCollection(paymentData)
            .toTable(tEnv, 'orderId, 'payType)
      
          tEnv.registerTable("Orders", orders)
          tEnv.registerTable("Payment", ratesHistory)
      
          var sqlQuery =
            """
              |SELECT
              |  o.orderId,
              |  o.productName,
              |  p.payType
              |FROM
              |  Orders AS o left outer JOIN Payment AS p ON o.orderId = p.orderId
              |""".stripMargin
          tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))
      
          val result = tEnv.scan("TemporalJoinResult").toRetractStream[Row]
          result.print()
          print(tEnv.explain(tEnv.sqlQuery(sqlQuery)))
          env.execute()
        }
      }
      

      整個流程的轉換如下:

      == Abstract Syntax Tree ==
      LogicalProject(orderId=[$0], productName=[$1], payType=[$3])
      +- LogicalJoin(condition=[=($0, $2)], joinType=[left])
         :- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
         +- LogicalTableScan(table=[[default_catalog, default_database, Payment]])
      
      == Optimized Logical Plan ==
      Calc(select=[orderId, productName, payType])
      +- Join(joinType=[LeftOuterJoin], where=[=(orderId, orderId0)], select=[orderId, productName, orderId0, payType], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
         :- Exchange(distribution=[hash[orderId]])
         :  +- DataStreamScan(table=[[default_catalog, default_database, Orders]], fields=[orderId, productName])
         +- Exchange(distribution=[hash[orderId]])
            +- DataStreamScan(table=[[default_catalog, default_database, Payment]], fields=[orderId, payType])
      
      == Physical Execution Plan ==
      Stage 1 : Data Source
      	content : Source: Collection Source
      
      Stage 2 : Data Source
      	content : Source: Collection Source
      
      	Stage 11 : Operator
      		content : SourceConversion(table=[default_catalog.default_database.Orders], fields=[orderId, productName])
      		ship_strategy : FORWARD
      
      		Stage 13 : Operator
      			content : SourceConversion(table=[default_catalog.default_database.Payment], fields=[orderId, payType])
      			ship_strategy : FORWARD
      
      			Stage 15 : Operator
      				content : Join(joinType=[LeftOuterJoin], where=[(orderId = orderId0)], select=[orderId, productName, orderId0, payType], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
      				ship_strategy : HASH
      
      				Stage 16 : Operator
      					content : Calc(select=[orderId, productName, payType])
      					ship_strategy : FORWARD
      
      輸出結果是
      (true,001,iphone,null)
      (false,001,iphone,null)
      (true,001,iphone,alipay)
      (true,002,mac,null)
      (false,002,mac,null)
      (true,002,mac,card)
      (true,003,book,null)
      (false,003,book,null)
      (true,003,book,card)
      (true,004,cup,null)
      (false,004,cup,null)
      (true,004,cup,alipay)
      

      下面是調試時候的調用棧,這個可以給大家參考

      // 調用Rule進行優化
      
      matches:49, StreamExecJoinRule (org.apache.Flink.table.planner.plan.rules.physical.stream)
      matchRecurse:263, VolcanoRuleCall (org.apache.calcite.plan.volcano)
      matchRecurse:370, VolcanoRuleCall (org.apache.calcite.plan.volcano)
      matchRecurse:370, VolcanoRuleCall (org.apache.calcite.plan.volcano)
      match:247, VolcanoRuleCall (org.apache.calcite.plan.volcano)
      fireRules:1534, VolcanoPlanner (org.apache.calcite.plan.volcano)
      registerImpl:1807, VolcanoPlanner (org.apache.calcite.plan.volcano)
      register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
      ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
      ensureRegistered:90, VolcanoPlanner (org.apache.calcite.plan.volcano)
      onRegister:329, AbstractRelNode (org.apache.calcite.rel)
      registerImpl:1668, VolcanoPlanner (org.apache.calcite.plan.volcano)
      register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
      ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
      ensureRegistered:90, VolcanoPlanner (org.apache.calcite.plan.volcano)
      onRegister:329, AbstractRelNode (org.apache.calcite.rel)
      registerImpl:1668, VolcanoPlanner (org.apache.calcite.plan.volcano)
      register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
      ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
      changeTraits:529, VolcanoPlanner (org.apache.calcite.plan.volcano)
      run:324, Programs$RuleSetProgram (org.apache.calcite.tools)
      optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
      apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
      apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      foreach:891, Iterator$class (scala.collection)
      foreach:1334, AbstractIterator (scala.collection)
      foreach:72, IterableLike$class (scala.collection)
      foreach:54, AbstractIterable (scala.collection)
      foldLeft:157, TraversableOnce$class (scala.collection)
      foldLeft:104, AbstractTraversable (scala.collection)
      optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
      optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
      optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
      translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
      toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toRetractStream:127, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
      toRetractStream:146, TableConversions (org.apache.Flink.table.api.scala)
      main:75, SimpleOuterJoin$ (spendreport)
      main:-1, SimpleOuterJoin (spendreport)
        
        
      // 調用Rule進行轉換到Flink邏輯算子  
        
      translateToPlanInternal:140, StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToPlanInternal:51, StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
      translateToPlan:51, StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToPlanInternal:54, StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToPlanInternal:39, StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
      translateToPlan:38, StreamExecCalcBase (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToTransformation:184, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToPlanInternal:153, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToPlanInternal:48, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
      translateToPlan:48, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
      apply:60, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation)
      apply:59, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation)
      apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
      apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
      foreach:891, Iterator$class (scala.collection)
      foreach:1334, AbstractIterator (scala.collection)
      foreach:72, IterableLike$class (scala.collection)
      foreach:54, AbstractIterable (scala.collection)
      map:234, TraversableLike$class (scala.collection)
      map:104, AbstractTraversable (scala.collection)
      translateToPlan:59, StreamPlanner (org.apache.flink.table.planner.delegation)
      translate:153, PlannerBase (org.apache.flink.table.planner.delegation)
      toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toRetractStream:146, TableConversions (org.apache.flink.table.api.scala)
      main:75, SimpleOuterJoin$ (spendreport)
      main:-1, SimpleOuterJoin (spendreport)  
       
      // 運行時候
        
      @Internal
      public final class StreamTwoInputProcessor<IN1, IN2> implements StreamInputProcessor {
      	private void processRecord2(
      			StreamRecord<IN2> record,
      			TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
      			Counter numRecordsIn) throws Exception {
      
      		streamOperator.setKeyContextElement2(record);
      		streamOperator.processElement2(record);
      		postProcessRecord(numRecordsIn);
      	}  
      }    
      
      // 能看出來,streamOperator就是StreamingJoinOperator
      
      streamOperator = {StreamingJoinOperator@10943} 
       leftIsOuter = true
       rightIsOuter = false
       outRow = {JoinedRow@10948} "JoinedRow{row1=org.apache.flink.table.dataformat.BinaryRow@dc6a1b67, row2=(+|null,null)}"
       leftNullRow = {GenericRow@10949} "(+|null,null)"
       rightNullRow = {GenericRow@10950} "(+|null,null)"
       leftRecordStateView = {OuterJoinRecordStateViews$InputSideHasNoUniqueKey@10945} 
       rightRecordStateView = {JoinRecordStateViews$InputSideHasNoUniqueKey@10946} 
       generatedJoinCondition = {GeneratedJoinCondition@10951} 
       leftType = {BaseRowTypeInfo@10952} "BaseRow(orderId: STRING, productName: STRING)"
       rightType = {BaseRowTypeInfo@10953} "BaseRow(orderId: STRING, payType: STRING)"
       leftInputSideSpec = {JoinInputSideSpec@10954} "NoUniqueKey"
       rightInputSideSpec = {JoinInputSideSpec@10955} "NoUniqueKey"
       nullFilterKeys = {int[1]@10956} 
       nullSafe = false
       filterAllNulls = true
       minRetentionTime = 0
       stateCleaningEnabled = false
       joinCondition = {AbstractStreamingJoinOperator$JoinConditionWithNullFilters@10947} 
       collector = {TimestampedCollector@10957} 
       chainingStrategy = {ChainingStrategy@10958} "HEAD"
       container = {TwoInputStreamTask@10959} "Join(joinType=[LeftOuterJoin], where=[(orderId = orderId0)], select=[orderId, productName, orderId0, payType], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) -> Calc(select=[orderId, productName, payType]) -> SinkConversionToTuple2 -> Sink: Print to Std. Out (1/1)"
       config = {StreamConfig@10960} "\n=======================Stream Config=======================\nNumber of non-chained inputs: 2\nNumber of non-chained outputs: 0\nOutput names: []\nPartitioning:\nChained subtasks: [(Join(joinType=[LeftOuterJoin], where=[(orderId = orderId0)], select=[orderId, productName, orderId0, payType], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])-7 -> Calc(select=[orderId, productName, payType])-8, typeNumber=0, selectedNames=[], outputPartitioner=FORWARD, outputTag=null)]\nOperator: SimpleOperatorFactory\nBuffer timeout: 100\nState Monitoring: false\n\n\n---------------------\nChained task configs\n---------------------\n{8=\n=======================Stream Config=======================\nNumber of non-chained inputs: 0\nNumber of non-chained outputs: 0\nOutput names: []\nPartitioning:\nChained subtasks: [(Calc(select=[orderId, productName, payType])-8 -> SinkConversionToTuple2-9, typeNumber=0, selectedNames=[], outputPartitioner=FORWARD, outputTag=null)]\nOperator: CodeGenOperatorFactory\nBuffer timeout: "
       output = {AbstractStreamOperator$CountingOutput@10961} 
       runtimeContext = {StreamingRuntimeContext@10962} 
       stateKeySelector1 = {BinaryRowKeySelector@10963} 
       stateKeySelector2 = {BinaryRowKeySelector@10964} 
       keyedStateBackend = {HeapKeyedStateBackend@10965} "HeapKeyedStateBackend"
       keyedStateStore = {DefaultKeyedStateStore@10966} 
       operatorStateBackend = {DefaultOperatorStateBackend@10967} 
       metrics = {OperatorMetricGroup@10968} 
       latencyStats = {LatencyStats@10969} 
       processingTimeService = {ProcessingTimeServiceImpl@10970} 
       timeServiceManager = {InternalTimeServiceManager@10971} 
       combinedWatermark = -9223372036854775808
       input1Watermark = -9223372036854775808
       input2Watermark = -9223372036854775808
        
      // 處理table 1 
      
      processElement1:118, StreamingJoinOperator (org.apache.Flink.table.runtime.operators.join.stream)
      processRecord1:135, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
      lambda$new$0:100, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
      accept:-1, 169462196 (org.apache.Flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$733)
      emitRecord:362, StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.Flink.streaming.runtime.io)
      processElement:151, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
      emitNext:128, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
      processInput:182, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
      processInput:311, StreamTask (org.apache.Flink.streaming.runtime.tasks)
      runDefaultAction:-1, 1284793893 (org.apache.Flink.streaming.runtime.tasks.StreamTask$$Lambda$713)
      runMailboxLoop:187, MailboxProcessor (org.apache.Flink.streaming.runtime.tasks.mailbox)
      runMailboxLoop:487, StreamTask (org.apache.Flink.streaming.runtime.tasks)
      invoke:470, StreamTask (org.apache.Flink.streaming.runtime.tasks)
      doRun:707, Task (org.apache.Flink.runtime.taskmanager)
      run:532, Task (org.apache.Flink.runtime.taskmanager)
      run:748, Thread (java.lang)
        
      // 處理table 2 
      
      processElement2:123, StreamingJoinOperator (org.apache.Flink.table.runtime.operators.join.stream)
      processRecord2:145, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
      lambda$new$1:107, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
      accept:-1, 76811487 (org.apache.Flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$734)
      emitRecord:362, StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.Flink.streaming.runtime.io)
      processElement:151, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
      emitNext:128, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
      processInput:185, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
      processInput:311, StreamTask (org.apache.Flink.streaming.runtime.tasks)
      runDefaultAction:-1, 1284793893 (org.apache.Flink.streaming.runtime.tasks.StreamTask$$Lambda$713)
      runMailboxLoop:187, MailboxProcessor (org.apache.Flink.streaming.runtime.tasks.mailbox)
      runMailboxLoop:487, StreamTask (org.apache.Flink.streaming.runtime.tasks)
      invoke:470, StreamTask (org.apache.Flink.streaming.runtime.tasks)
      doRun:707, Task (org.apache.Flink.runtime.taskmanager)
      run:532, Task (org.apache.Flink.runtime.taskmanager)
      run:748, Thread (java.lang)
        
      // 處理table 1   
        
      processRecord1:134, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
      lambda$new$0:100, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
      accept:-1, 230607815 (org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$735)
      emitRecord:362, StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.flink.streaming.runtime.io)
      processElement:151, StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io)
      emitNext:128, StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io)
      processInput:182, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
      processInput:311, StreamTask (org.apache.flink.streaming.runtime.tasks)
      runDefaultAction:-1, 33038573 (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$718)
      runMailboxLoop:187, MailboxProcessor (org.apache.flink.streaming.runtime.tasks.mailbox)
      runMailboxLoop:487, StreamTask (org.apache.flink.streaming.runtime.tasks)
      invoke:470, StreamTask (org.apache.flink.streaming.runtime.tasks)
      doRun:707, Task (org.apache.flink.runtime.taskmanager)
      run:532, Task (org.apache.flink.runtime.taskmanager)
      run:748, Thread (java.lang)	
        
      // 處理table 2   
        
      processRecord2:144, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
      lambda$new$1:107, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
      accept:-1, 212261435 (org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$736)
      emitRecord:362, StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.flink.streaming.runtime.io)
      processElement:151, StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io)
      emitNext:128, StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io)
      processInput:185, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
      processInput:311, StreamTask (org.apache.flink.streaming.runtime.tasks)
      runDefaultAction:-1, 33038573 (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$718)
      runMailboxLoop:187, MailboxProcessor (org.apache.flink.streaming.runtime.tasks.mailbox)
      runMailboxLoop:487, StreamTask (org.apache.flink.streaming.runtime.tasks)
      invoke:470, StreamTask (org.apache.flink.streaming.runtime.tasks)
      doRun:707, Task (org.apache.flink.runtime.taskmanager)
      run:532, Task (org.apache.flink.runtime.taskmanager)
      run:748, Thread (java.lang)  
        
      

      0x07 代碼實例 WINDOW JOIN

      import java.sql.Timestamp
      import org.apache.flink.api.java.utils.ParameterTool
      import org.apache.flink.api.scala._
      import org.apache.flink.streaming.api.TimeCharacteristic
      import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
      import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
      import org.apache.flink.streaming.api.windowing.time.Time
      import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
      import org.apache.flink.table.api.scala._
      import org.apache.flink.types.Row
      
      import scala.collection.mutable
      
      import java.sql.Timestamp
      
      import org.apache.flink.api.scala._
      import org.apache.flink.streaming.api.TimeCharacteristic
      import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
      import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
      import org.apache.flink.streaming.api.windowing.time.Time
      import org.apache.flink.table.api.TableEnvironment
      import org.apache.flink.table.api.scala._
      import org.apache.flink.types.Row
      
      import scala.collection.mutable
      
      object SimpleTimeIntervalJoinA {
        def main(args: Array[String]): Unit = {
          val params = ParameterTool.fromArgs(args)
          val planner = if (params.has("planner")) params.get("planner") else "flink"
      
          val env = StreamExecutionEnvironment.getExecutionEnvironment
      
          val tEnv = if (planner == "blink") {  // use blink planner in streaming mode
            val settings = EnvironmentSettings.newInstance()
              .useBlinkPlanner()
              .inStreamingMode()
              .build()
            StreamTableEnvironment.create(env, settings)
          } else if (planner == "flink") {  // use flink planner in streaming mode
            StreamTableEnvironment.create(env)
          } else {
            System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>', " +
              "where planner (it is either flink or blink, and the default is flink) indicates whether the " +
              "example uses flink planner or blink planner.")
            return
          }
          env.setParallelism(1)
          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
          // 構造訂單數據
          val ordersData = new mutable.MutableList[(String, String, Timestamp)]
          ordersData.+=(("001", "iphone", new Timestamp(1545800002000L)))
          ordersData.+=(("002", "mac", new Timestamp(1545800003000L)))
          ordersData.+=(("003", "book", new Timestamp(1545800004000L)))
          ordersData.+=(("004", "cup", new Timestamp(1545800018000L)))
      
          // 構造付款表
          val paymentData = new mutable.MutableList[(String, String, Timestamp)]
          paymentData.+=(("001", "alipay", new Timestamp(1545803501000L)))
          paymentData.+=(("002", "card", new Timestamp(1545803602000L)))
          paymentData.+=(("003", "card", new Timestamp(1545803610000L)))
          paymentData.+=(("004", "alipay", new Timestamp(1545803611000L)))
          val orders = env
            .fromCollection(ordersData)
            .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
            .toTable(tEnv, 'orderId, 'productName, 'orderTime.rowtime)
          val ratesHistory = env
            .fromCollection(paymentData)
            .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
            .toTable(tEnv, 'orderId, 'payType, 'payTime.rowtime)
      
          tEnv.registerTable("Orders", orders)
          tEnv.registerTable("Payment", ratesHistory)
      
          var sqlQuery =
            """
              |SELECT
              |  o.orderId,
              |  o.productName,
              |  p.payType,
              |  o.orderTime,
              |  cast(payTime as timestamp) as payTime
              |FROM
              |  Orders AS o left outer JOIN Payment AS p ON o.orderId = p.orderId AND
              | p.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR
              |""".stripMargin
          tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))
      
          val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
          result.print()
          print(tEnv.explain(tEnv.sqlQuery(sqlQuery)))
          env.execute()
        }
      }
      
      class TimestampExtractor[T1, T2]
        extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {
        override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
          element._3.getTime
        }
      }
      

      輸出如下

      == Abstract Syntax Tree ==
      LogicalProject(orderId=[$0], productName=[$1], payType=[$4], orderTime=[$2], payTime=[CAST($5):TIMESTAMP(6)])
      +- LogicalJoin(condition=[AND(=($0, $3), >=($5, $2), <=($5, +($2, 3600000:INTERVAL HOUR)))], joinType=[left])
         :- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
         +- LogicalTableScan(table=[[default_catalog, default_database, Payment]])
      
      == Optimized Logical Plan ==
      Calc(select=[orderId, productName, payType, orderTime, CAST(CAST(payTime)) AS payTime])
      +- WindowJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-3600000, leftUpperBound=0, leftTimeIndex=2, rightTimeIndex=2], where=[AND(=(orderId, orderId0), >=(payTime, orderTime), <=(payTime, +(orderTime, 3600000:INTERVAL HOUR)))], select=[orderId, productName, orderTime, orderId0, payType, payTime])
         :- Exchange(distribution=[hash[orderId]])
         :  +- DataStreamScan(table=[[default_catalog, default_database, Orders]], fields=[orderId, productName, orderTime])
         +- Exchange(distribution=[hash[orderId]])
            +- DataStreamScan(table=[[default_catalog, default_database, Payment]], fields=[orderId, payType, payTime])
      
      == Physical Execution Plan ==
      Stage 1 : Data Source
      	content : Source: Collection Source
      
      	Stage 2 : Operator
      		content : Timestamps/Watermarks
      		ship_strategy : FORWARD
      
      Stage 3 : Data Source
      	content : Source: Collection Source
      
      	Stage 4 : Operator
      		content : Timestamps/Watermarks
      		ship_strategy : FORWARD
      
      		Stage 13 : Operator
      			content : SourceConversion(table=[default_catalog.default_database.Orders], fields=[orderId, productName, orderTime])
      			ship_strategy : FORWARD
      
      			Stage 15 : Operator
      				content : SourceConversion(table=[default_catalog.default_database.Payment], fields=[orderId, payType, payTime])
      				ship_strategy : FORWARD
      
      				Stage 17 : Operator
      					content : WindowJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-3600000, leftUpperBound=0, leftTimeIndex=2, rightTimeIndex=2], where=[((orderId = orderId0) AND (payTime >= orderTime) AND (payTime <= (orderTime + 3600000:INTERVAL HOUR)))], select=[orderId, productName, orderTime, orderId0, payType, payTime])
      					ship_strategy : HASH
      
      					Stage 18 : Operator
      						content : Calc(select=[orderId, productName, payType, orderTime, CAST(CAST(payTime)) AS payTime])
      						ship_strategy : FORWARD
      
      001,iphone,alipay,2018-12-26T04:53:22,2018-12-26T05:51:41
      002,mac,card,2018-12-26T04:53:23,2018-12-26T05:53:22
      004,cup,alipay,2018-12-26T04:53:38,2018-12-26T05:53:31
      003,book,null,2018-12-26T04:53:24,null
      

      相關類以及調用棧

      class StreamExecWindowJoin {  
      }
      
      class StreamExecWindowJoinRule
        extends ConverterRule(
          classOf[FlinkLogicalJoin],
          FlinkConventions.LOGICAL,
          FlinkConventions.STREAM_PHYSICAL,
          "StreamExecWindowJoinRule") {
      }
      
      
      matches:54, StreamExecWindowJoinRule (org.apache.flink.table.planner.plan.rules.physical.stream)
      matchRecurse:263, VolcanoRuleCall (org.apache.calcite.plan.volcano)
      match:247, VolcanoRuleCall (org.apache.calcite.plan.volcano)
      fireRules:1534, VolcanoPlanner (org.apache.calcite.plan.volcano)
      registerImpl:1807, VolcanoPlanner (org.apache.calcite.plan.volcano)
      register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
      ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
      ensureRegistered:90, VolcanoPlanner (org.apache.calcite.plan.volcano)
      onRegister:329, AbstractRelNode (org.apache.calcite.rel)
      registerImpl:1668, VolcanoPlanner (org.apache.calcite.plan.volcano)
      register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
      ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
      ensureRegistered:90, VolcanoPlanner (org.apache.calcite.plan.volcano)
      onRegister:329, AbstractRelNode (org.apache.calcite.rel)
      registerImpl:1668, VolcanoPlanner (org.apache.calcite.plan.volcano)
      register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
      ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
      changeTraits:529, VolcanoPlanner (org.apache.calcite.plan.volcano)
      run:324, Programs$RuleSetProgram (org.apache.calcite.tools)
      optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program)
      apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
      apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
      foreach:891, Iterator$class (scala.collection)
      foreach:1334, AbstractIterator (scala.collection)
      foreach:72, IterableLike$class (scala.collection)
      foreach:54, AbstractIterable (scala.collection)
      foldLeft:157, TraversableOnce$class (scala.collection)
      foldLeft:104, AbstractTraversable (scala.collection)
      optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program)
      optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
      doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
      optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
      optimize:248, PlannerBase (org.apache.flink.table.planner.delegation)
      translate:151, PlannerBase (org.apache.flink.table.planner.delegation)
      toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toAppendStream:107, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toAppendStream:101, TableConversions (org.apache.flink.table.api.scala)
      main:93, SimpleTimeIntervalJoinA$ (spendreport)
      main:-1, SimpleTimeIntervalJoinA (spendreport)
        
        
      translateToPlanInternal:136, StreamExecWindowJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToPlanInternal:53, StreamExecWindowJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
      translateToPlan:53, StreamExecWindowJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToPlanInternal:54, StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToPlanInternal:39, StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
      translateToPlan:38, StreamExecCalcBase (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToTransformation:184, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToPlanInternal:153, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToPlanInternal:48, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
      translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
      translateToPlan:48, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
      apply:60, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation)
      apply:59, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation)
      apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
      apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
      foreach:891, Iterator$class (scala.collection)
      foreach:1334, AbstractIterator (scala.collection)
      foreach:72, IterableLike$class (scala.collection)
      foreach:54, AbstractIterable (scala.collection)
      map:234, TraversableLike$class (scala.collection)
      map:104, AbstractTraversable (scala.collection)
      translateToPlan:59, StreamPlanner (org.apache.flink.table.planner.delegation)
      translate:153, PlannerBase (org.apache.flink.table.planner.delegation)
      toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toAppendStream:107, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
      toAppendStream:101, TableConversions (org.apache.flink.table.api.scala)
      main:93, SimpleTimeIntervalJoinA$ (spendreport)
      main:-1, SimpleTimeIntervalJoinA (spendreport)   
      

      0x08 參考

      Flink table&Sql中使用Calcite

      Flink sql的實現

      Calcite 功能簡析及在 Flink 的應用

      基于Flink1.8 深入理解Flink Sql執行流程 + Flink Sql語法擴展

      使用Flink Table &Sql api來構建批量和流式應用(3)Flink Sql 使用

      Flink關系型API: Table API 與SQL

      Flink sql的實現

      Flink如何實現動態表與靜態表的Join操作

      一文解析Flink SQL工作流程

      Flink1.9-table/SQLAPI

      【Flink SQL引擎】:Calcite 功能簡析及在 Flink 的應用

      Apache Calcite 處理流程詳解(一)

      Apache Calcite 優化器詳解(二)

      揭秘 Flink 1.9 新架構,Blink Planner 你會用了嗎?

      Flink 原理與實現:Table & SQL API | Jark's Blog

      posted @ 2020-04-25 10:08  羅西的思考  閱讀(9344)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 日韩狼人精品在线观看| 国产99视频精品免费专区| Y111111国产精品久久久| 国产一卡2卡三卡4卡免费网站| 久久亚洲人成网站| 久久久精品国产精品久久| 久久精品国产国产精品四凭| 精品人妻系列无码天堂| 国产精品亚洲一区二区z| 国产v综合v亚洲欧美久久| 国产一区二区三区黄色片| 国产乱码1卡二卡3卡四卡5| 粗壮挺进邻居人妻无码| 久久国产国内精品国语对白| 亚洲伊人精品久视频国产| 真实国产老熟女无套中出| 亚洲精品中文字幕无码蜜桃| 亚洲综合一区二区精品导航| 国产精品视频午夜福利| 久久久久成人片免费观看蜜芽| 国产内射xxxxx在线| 欧美xxxxhd高清| 欧美黑人又粗又大又爽免费| 黄梅县| 国产精品国产三级国快看| 又大又粗又硬又爽黄毛少妇| 伊人天天久大香线蕉av色| 日本一区二区三区在线看| 少妇性l交大片| 国产午夜福利视频一区二区| 中文字幕人妻不卡精品| 无套后入极品美女少妇| 亚洲区一区二区激情文学| 午夜爽爽爽男女免费观看影院| 92国产精品午夜福利免费| 丹棱县| 国产999精品2卡3卡4卡| 三人成全免费观看电视剧高清| 国内精品无码一区二区三区| 元码人妻精品一区二区三区9 | 早起邻居人妻奶罩太松av|