<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Flink table&Sql中使用Calcite

       

      Apache Calcite是什么東東

      Apache Calcite面向Hadoop新的sql引擎,它提供了標準的SQL語言、多種查詢優化和連接各種數據源的能力。除此之外,Calcite還提供了OLAP和流處理的查詢引擎。它2013年成為了Apache孵化項目以來,在Hadoop中越來越引人注目,并被眾多項目集成。比如Flink/Storm/Drill/Phoenix都依賴它做sql解析和優化。

      Flink 結合 Calcite

      Flink Table API&SQL 為流式數據和靜態數據的關系查詢保留統一的接口,而且利用了Calcite的查詢優化框架和SQL parser。該設計是基于Flink已構建好的API構建的,DataStream API 提供低延時高吞吐的流處理能力而且就有exactly-once語義而且可以基于event-time進行處理。而且DataSet擁有穩定高效的內存算子和流水線式的數據交換。Flink的core API和引擎的所有改進都會自動應用到Table API和SQL上。
      一條stream sql從提交到calcite解析、優化最后到flink引擎執行,一般分為以下幾個階段:

      1 1. Sql Parser: 將sql語句通過java cc解析成AST(語法樹),在calcite中用SqlNode表示AST;
      2 2. Sql Validator: 結合數字字典(catalog)去驗證sql語法;
      3 3. 生成Logical Plan: 將sqlNode表示的AST轉換成LogicalPlan, 用relNode表示;
      4 4. 生成 optimized LogicalPlan: 先基于calcite rules 去優化logical Plan,
      5 再基于flink定制的一些優化rules去優化logical Plan;
      6 5. 生成Flink PhysicalPlan: 這里也是基于flink里頭的rules將,將optimized LogicalPlan轉成成Flink的物理執行計劃;
      7 6. 將物理執行計劃轉成Flink ExecutionPlan: 就是調用相應的tanslateToPlan方法轉換和利用CodeGen元編程成Flink的各種算子。

      而如果是通過table api來提交任務的話,也會經過calcite優化等階段,基本流程和直接運行sql類似:

      1. table api parser: flink會把table api表達的計算邏輯也表示成一顆樹,用treeNode去表式;
      在這棵樹上的每個節點的計算邏輯用Expression來表示。
      2. Validate: 會結合數字字典(catalog)將樹的每個節點的Unresolved Expression進行綁定,生成Resolved Expression;
      3. 生成Logical Plan: 依次遍歷數的每個節點,調用construct方法將原先用treeNode表達的節點轉成成用calcite 內部的數據結構relNode 來表達。即生成了LogicalPlan, 用relNode表示;
      4. 生成 optimized LogicalPlan: 先基于calcite rules 去優化logical Plan,
      再基于flink定制的一些優化rules去優化logical Plan;
      5. 生成Flink PhysicalPlan: 這里也是基于flink里頭的rules將,將optimized LogicalPlan轉成成Flink的物理執行計劃;
      6. 將物理執行計劃轉成Flink ExecutionPlan: 就是調用相應的tanslateToPlan方法轉換和利用CodeGen元編程成Flink的各種算子。

      所以在flink提供兩種API進行關系型查詢,Table API 和 SQL。這兩種API的查詢都會用包含注冊過的Table的catalog進行驗證,除了在開始階段從計算邏輯轉成logical plan有點差別以外,之后都差不多。同時在stream和batch的查詢看起來也是完全一樣。只不過flink會根據數據源的性質(流式和靜態)使用不同的規則進行優化, 最終優化后的plan轉傳成常規的Flink DataSet 或 DataStream 程序。所以我們下面統一用table api來舉例講解flink是如何用calcite做解析優化,再轉換成回DataStream。

      Table api任務的解析執行過程

      Table Example

       1 // set up execution environment 
       2 val env = StreamExecutionEnvironment.getExecutionEnvironment
       3 val tEnv = TableEnvironment.getTableEnvironment(env) 
       4 //定義數據源 
       5 val dataStream = env.fromCollection(Seq( Order(1L, "beer", 3), Order(1L, "diaper", 4), Order(3L, "rubber", 2))) 
       6 //將DataStream 轉換成 table,就是將數據源在TableEnvironment中注冊成表 
       7 val orderA = dataStream.toTable(tEnv) 
       8 //用table api執行業務邏輯, 生成tab里頭包含了flink 自己的logicalPlan,用LogicalNode表示 
       9 val tab = orderA.groupBy('user).select('user, 'amount.sum) 
      10       .filter('user < 2L) 
      11 //將table轉成成DataStream, 這里頭就是涉及到我們calcite邏輯計劃生成 
      12 // 優化、轉成可可執行的flink 算子等過程 
      13 val result = tab.toDataStream[Order]

      將數據源注冊成表

      將DataStream 轉換成table的過程,其實就是將DataStream在TableEnvironment中注冊成表的過程中,主要是通過調用tableEnv.fromDataStream方法完成。

      1 // 生成一個唯一性表名 val name = createUniqueTableName() 
      2 //生成表的 scheme val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType) 
      3 //傳入dataStream, 創建calcite可以識別的表 
      4 val dataStreamTable = new DataStreamTable[T]( 
      5       dataStream, 
      6       fieldIndexes, 
      7       fieldNames, None, None ) 
      8 //在數字字典里頭注冊該表 registerTableInternal(name, dataStreamTable)

      上面函數實現的最后會調用scan,這里頭會創建一個CatalogNode對象,里頭攜帶了可以查找到數據源的表路徑。其實它是Flink 邏輯樹上的一個葉節點。

      生成Flink 自身的邏輯計劃

      1 val tab = orderA.groupBy('user).select('user, 'amount.sum)
      2       .filter('user < 2L) 

      上面每次調用table api,就會生成Flink 邏輯計劃的節點。比如grouBy和select的調用會生成節點Project、Aggregate、Project,而filter的調用會生成節點Filter。這些節點的邏輯關系,就會組成下圖的一個Flink 自身數據結構表達的一顆邏輯樹:

      因為這個例子很簡單,節點都沒有兩個子節點。這里的實現可能有的人會奇怪,filter函數的形參類型是Expression,而我們傳進去的是"'user<2L",是不是不對呀? 其實這是scala比較牛逼的特性:隱式轉換,這些傳遞的表達式會先自動轉換成Expression。這些隱式轉換的定義基本都在接口類ImplicitExpressionOperations里頭。其中user前面定義的'符號,則scala會將user字符串轉化成Symbol類型。通過隱式轉換"'user<2L"表示式會生成一個LessThan對象,它會有兩個孩子Expression,分別是UnresolvedFieldReference("user")和Liter("2")。這個LessThan對象會作為Filter對象的condition。

      Flink 自身的邏輯計劃 轉換成calcite可識別的邏輯計劃

      根據上面分析我們只是生成了Flink的 logical Plan,我們必須將它轉換成calcite的logical Plan,這樣我們才能用到calcite強大的優化規則。在Flink里頭會由上往下一次調用各個節點的construct方法,將Flink節點轉換成calcite的RelNode節點。

       1 //-----Filter的construct創建Calcite 的 LogicalFilter節點---- 
       2     //先遍歷子節點 
       3     child.construct(relBuilder) 
       4     //創建LogicalFilter 
       5     relBuilder.filter(condition.toRexNode(relBuilder)) 
       6     
       7 //-----Project的construct創建Calcite的LogicalProject節點---- 
       8    //先遍歷子節點 
       9     child.construct(relBuilder) 
      10    //創建LogicalProject 
      11     relBuilder.project( 
      12       projectList.map(_.toRexNode(relBuilder)).asJava, 
      13       projectList.map(_.name).asJava, 
      14       true) 
      15       
      16 //-----Aggregate的construct創建Calcite的LogicalAggregate節點---- 
      17     child.construct(relBuilder) 
      18     relBuilder.aggregate( 
      19   relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava), 
      20       aggregateExpressions.map { 
      21         case Alias(agg: Aggregation, name, _) => agg.toAggCall(name)(relBuilder) 
      22         case _ => throw new RuntimeException("This should never happen.") 
      23       }.asJava) 
      24   
      25 //-----CatalogNode的construct創建Calcite的LogicalTableScan節點---- 
      26     relBuilder.scan(tablePath.asJava)

      通過以上轉換后,就生成了Calcite邏輯計劃:

      優化邏輯計劃并轉換成Flink的物理計劃

      這部分實現Flink統一封裝在optimize方法里頭,這個方法具體的實現如下:

       1 // 去除關聯子查詢 
       2     val decorPlan = RelDecorrelator.decorrelateQuery(relNode) 
       3     // 轉換time的標識符,比如存在rowtime標識的話,我們將會引入TimeMaterializationSqlFunction operator, 
       4     //這個operator我們會在codeGen中會用到 
       5     val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder) 
       6     // 規范化logica計劃,比如一個Filter它的過濾條件都是true的話,那么我們可以直接將這個filter去掉 
       7     val normRuleSet = getNormRuleSet
       8     val normalizedPlan = if (normRuleSet.iterator().hasNext) { 
       9       runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, convPlan, convPlan.getTraitSet) 
      10     } else { 
      11       convPlan
      12     } 
      13     // 優化邏輯計劃,調整節點間的上下游到達優化計算邏輯的效果,同時將 
      14     //節點轉換成派生于FlinkLogicalRel的節點 
      15     val logicalOptRuleSet = getLogicalOptRuleSet
      16     //用FlinkConventions.LOGICAL替換traitSet,表示轉換后的樹節點要求派生與接口 
      17     // FlinkLogicalRel 
      18     val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify() 
      19     val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) { 
      20       runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps) 
      21     } else { 
      22       normalizedPlan
      23     } 
      24     // 將優化后的邏輯計劃轉換成Flink的物理計劃,同時將 
      25     //節點轉換成派生于DataStreamRel的節點 
      26     val physicalOptRuleSet = getPhysicalOptRuleSet
      27     val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify() 
      28     val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) { 
      29       runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps) 
      30     } else { 
      31       logicalPlan
      32     }

      這段涉及到多個階段,每個階段無非都是用Rule對邏輯計劃進行優化和改進。每個Rule的邏輯大家自己去看,如果我想自己自定義一個Rule該如何做呢?首先聲明定義于派生RelOptRule的一個類,然后再構造函數中要求傳入RelOptRuleOperand對象,該對象需要傳入你這個Rule將要匹配的節點類型。如果你的自定義的Rule只用于LogicalTableScan節點,那么你這個operand對象應該是operand(LogicalTableScan.class, any())。就像這樣一樣

       1 public class TableScanRule extends RelOptRule { 
       2   //~ Static fields/initializers --------------------------------------------- 
       3   public static final TableScanRule INSTANCE = new TableScanRule(); 
       4   //~ Constructors ----------------------------------------------------------- 
       5   private TableScanRule() { 
       6     super(operand(LogicalTableScan.class, any())); 
       7   } 
       8   //默認返回True, 可以繼承matches,里面實現邏輯是判斷是否進行轉換調用onMatch 
       9   @Override 
      10   public boolean matches(RelOptRuleCall call) { 
      11     return super.matches(call); 
      12   } 
      13   //~ Methods ---------------------------------------------------------------- 
      14   //對當前節點進行轉換 
      15   public void onMatch(RelOptRuleCall call) { 
      16     final LogicalTableScan oldRel = call.rel(0); 
      17     RelNode newRel = 
      18         oldRel.getTable().toRel( 
      19             RelOptUtil.getContext(oldRel.getCluster())); 
      20     call.transformTo(newRel); 
      21   } 
      22 }
      通過以上代碼對邏輯計劃進行了優化和轉換,最后會將邏輯計劃的每個節點轉換成Flink Node,既可物理計劃。整個轉換過程最后的結果如下:
      1 == Optimized pyhical Plan == DataStreamGroupAggregate(groupBy=[user], select=[user, SUM(amount) AS TMP_0])  
      2         
      3 DataStreamCalc(select=[user, amount], where=[<(user, 2)]) 
      4         
      5 DataStreamScan(table=[[_DataStreamTable_0]]) 

      我們發現Filter節點在樹結構中下移了,這樣對數據進行操作時現在過濾再做聚合,可以減少計算量。

      生成Flink 可以執行的計劃

      這一塊只要是遞歸調用各個節點DataStreamRel的translateToPlan方法,這個方法轉換和利用CodeGen元編程成Flink的各種算子。現在就相當于我們直接利用Flink的DataSet或DataStream API開發的程序。整個流程的轉換大體就像這樣:

      1 == Physical Execution Plan == 
      2 Stage 1 : Data Source
      3     content : collect elements with CollectionInputFormat
      4 Stage 2 : Operator content : from: (user, product, amount) 
      5         ship_strategy : REBALANCE 
      6 Stage 3 : Operator content : where: (<(user, 2)), select: (user, amount) 
      7             ship_strategy : FORWARD 
      8 Stage 4 : Operator content : groupBy: (user), select: (user, SUM(amount) AS TMP_0) 
      9                 ship_strategy : HASH

      總結

      不過這個樣例中忽略了流處理中最有趣的部分:window aggregate 和 join。這些操作如何用SQL表達呢?Apache Calcite社區提出了一個proposal來討論SQL on streams的語法和語義。社區將Calcite的stream SQL描述為標準SQL的擴展而不是另外的 SQL-like語言。這有很多好處,首先,熟悉SQL標準的人能夠在不學習新語法的情況下分析流數據。靜態表和流表的查詢幾乎相同,可以輕松地移植。此外,可以同時在靜態表和流表上進行查詢,這和flink的愿景是一樣的,將批處理看做特殊的流處理(批看作是有限的流)。最后,使用標準SQL進行流處理意味著有很多成熟的工具支持

       

      此文轉載自http://blog.chinaunix.net/uid-29038263-id-5765791.html,感謝。

       

      posted @ 2018-12-19 11:40  boiledwater  閱讀(7460)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 精品久久久久久无码中文野结衣| 国产精品亚洲а∨天堂2021| 无人去码一码二码三码区| 97久久精品人人做人人爽| 一本大道久久香蕉成人网| 久久久久国产一区二区| 国产精品福利午夜久久香蕉| 国产综合久久99久久| 久视频久免费视频久免费| 性欧美VIDEOFREE高清大喷水| 亚洲国产精品久久久天堂麻豆宅男 | 国产一区二区不卡自拍| 乱女伦露脸对白在线播放| 日韩欧激情一区二区三区| 亚洲av永久无码精品网站| 成人年无码av片在线观看| 国产热A欧美热A在线视频| 亚洲国产成人无码网站大全| 无码人妻丰满熟妇啪啪欧美| 日韩欧美卡一卡二卡新区| 少妇高潮激情一区二区三| 亚洲av日韩av综合在线观看| 最新国产精品好看的精品| 少妇被粗大的猛烈进出| 亚洲国产成人久久77| 亚洲日本va午夜中文字幕久久 | 亚洲情色av一区二区| 国产精品毛片久久久久久久| 久热99热这里只有精品| 国产午夜福利视频合集| 亚洲 制服 丝袜 无码| 8050午夜二级无码中文字幕| 日本夜爽爽一区二区三区| 国产精品福利自产拍久久| 国产真实精品久久二三区| 亚洲日韩国产精品第一页一区| 万州区| 日韩精品国产二区三区| 国内精品久久久久影院蜜芽| 无码人妻斩一区二区三区| 91精品国产吴梦梦在线观看永久|