<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      從flink的官方文檔,我們知道flink的編程模型分為四層,sql層是最高層的api,Table api是中間層,DataStream/DataSet Api 是核心,stateful Streaming process層是底層實現。

       

       

       

      其中,

      flink dataset api使用及原理 介紹了DataSet Api 

      flink DataStream API使用及原理介紹了DataStream Api 

      flink中的時間戳如何使用?---Watermark使用及原理 介紹了底層實現的基礎Watermark

      flink window實例分析 介紹了window的概念及使用原理

      Flink中的狀態與容錯 介紹了State的概念及checkpoint,savepoint的容錯機制

       上上篇<使用flink Table &Sql api來構建批量和流式應用(1)Table的基本概念>介紹了Table的基本概念及使用方法

      上篇<使用flink Table &Sql api來構建批量和流式應用(2)Table API概述>

      本篇主要看看Flink Sql 有哪些功能及背后的原理

      1. sql功能

       體現在org.apache.flink.table.api.TableEnvironment,目前flink僅支持select和insert操作

      (1) select 

          /**
           * Evaluates a SQL query on registered tables and retrieves the result as a {@link Table}.
           *
           * <p>All tables referenced by the query must be registered in the TableEnvironment.
           * A {@link Table} is automatically registered when its {@link Table#toString()} method is
           * called, for example when it is embedded into a String.
           * Hence, SQL queries can directly reference a {@link Table} as follows:
           *
           * <pre>
           * {@code
           *   Table table = ...;
           *   String tableName = table.toString();
           *   // the table is not registered to the table environment
           *   tEnv.sqlQuery("SELECT * FROM tableName");
           * }
           * </pre>
           *
           * @param query The SQL query to evaluate.
           * @return The result of the query as Table
           */
          Table sqlQuery(String query);

      (2) update(當前僅支持insert)

          /**
           * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
           * NOTE: Currently only SQL INSERT statements are supported.
           *
           * <p>All tables referenced by the query must be registered in the TableEnvironment.
           * A {@link Table} is automatically registered when its {@link Table#toString()} method is
           * called, for example when it is embedded into a String.
           * Hence, SQL queries can directly reference a {@link Table} as follows:
           *
           * <pre>
           * {@code
           *   // register the configured table sink into which the result is inserted.
           *   tEnv.registerTableSink("sinkTable", configuredSink);
           *   Table sourceTable = ...
           *   String tableName = sourceTable.toString();
           *   // sourceTable is not registered to the table environment
           *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName");
           * }
           * </pre>
           *
           * @param stmt The SQL statement to evaluate.
           */
          void sqlUpdate(String stmt);
      
          /**
           * Evaluates a SQL statement such as INSERT, UPDATE or DELETE; or a DDL statement;
           * NOTE: Currently only SQL INSERT statements are supported.
           *
           * <p>All tables referenced by the query must be registered in the TableEnvironment.
           * A {@link Table} is automatically registered when its {@link Table#toString()} method is
           * called, for example when it is embedded into a String.
           * Hence, SQL queries can directly reference a {@link Table} as follows:
           *
           * <pre>
           * {@code
           *   // register the configured table sink into which the result is inserted.
           *   tEnv.registerTableSink("sinkTable", configuredSink);
           *   Table sourceTable = ...
           *   String tableName = sourceTable.toString();
           *   // sourceTable is not registered to the table environment
           *   tEnv.sqlUpdate(s"INSERT INTO sinkTable SELECT * FROM tableName", config);
           * }
           * </pre>
           *
           * @param stmt The SQL statement to evaluate.
           * @param config The {@link QueryConfig} to use.
           */
          void sqlUpdate(String stmt, QueryConfig config);

      2. sql解析原理

      Apache Calcite面向Hadoop新的sql引擎,它提供了標準的SQL語言、多種查詢優化和連接各種數據源的能力。除此之外,Calcite還提供了OLAP和流處理的查詢引擎。它2013年成為了Apache孵化項目以來,在Hadoop中越來越引人注目,并被眾多項目集成。比如Flink/Storm/Drill/Phoenix都依賴它做sql解析和優化。

      先從demo跑起來,看看sql 解析都經歷了什么工程?

      (1) select 

      package org.apache.flink.table.examples.java;
      
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.Table;
      import org.apache.flink.table.api.java.StreamTableEnvironment;
      
      import java.util.Arrays;
      
      /**
       * Simple example for demonstrating the use of SQL on a Stream Table in Java.
       *
       * <p>This example shows how to:
       *  - Convert DataStreams to Tables
       *  - Register a Table under a name
       *  - Run a StreamSQL query on the registered Table
       *
       */
      public class StreamSQLExample {
      
          // *************************************************************************
          //     PROGRAM
          // *************************************************************************
      
          public static void main(String[] args) throws Exception {
      
              // set up execution environment
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
      
              DataStream<Order> orderA = env.fromCollection(Arrays.asList(
                  new Order(1L, "beer", 3),
                  new Order(1L, "diaper", 4),
                  new Order(3L, "rubber", 2)));
      
              DataStream<Order> orderB = env.fromCollection(Arrays.asList(
                  new Order(2L, "pen", 3),
                  new Order(2L, "rubber", 3),
                  new Order(4L, "beer", 1)));
      
              // convert DataStream to Table
              Table tableA = tEnv.fromDataStream(orderA, "user, product, amount");
              // register DataStream as Table
              tEnv.registerDataStream("OrderB", orderB, "user, product, amount");
      
              // union the two tables
              Table result = tEnv.sqlQuery("SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " +
                              "SELECT * FROM OrderB WHERE amount < 2");
      
              tEnv.toAppendStream(result, Order.class).print();
      
              env.execute();
          }
      
          // *************************************************************************
          //     USER DATA TYPES
          // *************************************************************************
      
          /**
           * Simple POJO.
           */
          public static class Order {
              public Long user;
              public String product;
              public int amount;
      
              public Order() {
              }
      
              public Order(Long user, String product, int amount) {
                  this.user = user;
                  this.product = product;
                  this.amount = amount;
              }
      
              @Override
              public String toString() {
                  return "Order{" +
                      "user=" + user +
                      ", product='" + product + '\'' +
                      ", amount=" + amount +
                      '}';
              }
          }
      }

      實現代碼如下

       override def sqlQuery(query: String): Table = {
          val planner = getFlinkPlanner
          // parse the sql query
          val parsed = planner.parse(query)
          if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
            // validate the sql query
            val validated = planner.validate(parsed)
            // transform to a relational tree
            val relational = planner.rel(validated)
            new TableImpl(this, new PlannerQueryOperation(relational.rel))
          } else {
            throw new TableException(
              "Unsupported SQL query! sqlQuery() only accepts SQL queries of type " +
                "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.")
          }
        }

      >>parse the sql query

      在calcite中用SqlNode表示

        public SqlSelect(SqlParserPos pos,
            SqlNodeList keywordList,
            SqlNodeList selectList,
            SqlNode from,
            SqlNode where,
            SqlNodeList groupBy,
            SqlNode having,
            SqlNodeList windowDecls,
            SqlNodeList orderBy,
            SqlNode offset,
            SqlNode fetch) {
          super(pos);
          this.keywordList = Objects.requireNonNull(keywordList != null
              ? keywordList : new SqlNodeList(pos));
          this.selectList = selectList;
          this.from = from;
          this.where = where;
          this.groupBy = groupBy;
          this.having = having;
          this.windowDecls = Objects.requireNonNull(windowDecls != null
              ? windowDecls : new SqlNodeList(pos));
          this.orderBy = orderBy;
          this.offset = offset;
          this.fetch = fetch;
        }

      >>validate the sql query

      SqlValidatorImpl驗證sqlNode

          public SqlNode validate(SqlNode topNode) {
              SqlValidatorScope scope = new EmptyScope(this);
              scope = new CatalogScope(scope, ImmutableList.of("CATALOG"));
              final SqlNode topNode2 = validateScopedExpression(topNode, scope);
              final RelDataType type = getValidatedNodeType(topNode2);
              Util.discard(type);
              return topNode2;
          }

      >>transform to a relational tree

      SqlToRelConverter.java

      /**
         * Converts an unvalidated query's parse tree into a relational expression.
         *
         * @param query           Query to convert
         * @param needsValidation Whether to validate the query before converting;
         *                        <code>false</code> if the query has already been
         *                        validated.
         * @param top             Whether the query is top-level, say if its result
         *                        will become a JDBC result set; <code>false</code> if
         *                        the query will be part of a view.
         */
        public RelRoot convertQuery(
            SqlNode query,
            final boolean needsValidation,
            final boolean top) {
          if (needsValidation) {
            query = validator.validate(query);
          }
      
          RelMetadataQuery.THREAD_PROVIDERS.set(
              JaninoRelMetadataProvider.of(cluster.getMetadataProvider()));
          RelNode result = convertQueryRecursive(query, top, null).rel;
          if (top) {
            if (isStream(query)) {
              result = new LogicalDelta(cluster, result.getTraitSet(), result);
            }
          }
          RelCollation collation = RelCollations.EMPTY;
          if (!query.isA(SqlKind.DML)) {
            if (isOrdered(query)) {
              collation = requiredCollation(result);
            }
          }
          checkConvertedType(query, result);
      
          if (SQL2REL_LOGGER.isDebugEnabled()) {
            SQL2REL_LOGGER.debug(
                RelOptUtil.dumpPlan("Plan after converting SqlNode to RelNode",
                    result, SqlExplainFormat.TEXT,
                    SqlExplainLevel.EXPPLAN_ATTRIBUTES));
          }
      
          final RelDataType validatedRowType = validator.getValidatedNodeType(query);
          return RelRoot.of(result, validatedRowType, query.getKind())
              .withCollation(collation);
        }

       

      (2)update

      代碼實現

        override def sqlUpdate(stmt: String): Unit = {
          sqlUpdate(stmt, this.queryConfig)
        }
      
        override def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
          val planner = getFlinkPlanner
          // parse the sql query
          val parsed = planner.parse(stmt)
          parsed match {
            case insert: SqlInsert =>
              // validate the SQL query
              val query = insert.getSource
              val validatedQuery = planner.validate(query)
      
              // get query result as Table
              val queryResult = new TableImpl(this,
                new PlannerQueryOperation(planner.rel(validatedQuery).rel))
      
              // get name of sink table
              val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
      
              // insert query result into sink table
              insertInto(queryResult, config, targetTablePath.asScala:_*)
            case _ =>
              throw new TableException(
                "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
          }
        }

      步驟類似,不再贅述。

      3. 總結

      Flink Table API&SQL 為流式數據和靜態數據的關系查詢保留統一的接口,而且利用了Calcite的查詢優化框架和SQL parser。該設計是基于Flink已構建好的API構建的,DataStream API 提供低延時高吞吐的流處理能力而且就有exactly-once語義而且可以基于event-time進行處理。而且DataSet擁有穩定高效的內存算子和流水線式的數據交換。Flink的core API和引擎的所有改進都會自動應用到Table API和SQL上。

      參考資料:

      【1】http://blog.chinaunix.net/uid-29038263-id-5765791.html

       

      posted on 2019-07-19 17:24  一天不進步,就是退步  閱讀(4969)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 夜夜添狠狠添高潮出水| 延庆县| 国产精品原创不卡在线| 蜜芽久久人人超碰爱香蕉| 国产免费视频一区二区| 国产精品欧美福利久久| 国内少妇偷人精品视频| 国产精品久久久久aaaa| 蜜臀av一区二区三区不卡| 五月天免费中文字幕av| 国产精品久久无中文字幕| 最新亚洲国产手机在线| 亚洲精品免费一二三区| 春菜花亚洲一区二区三区| av午夜福利一片免费看久久| 亚州少妇无套内射激情视频| 久久精品久久电影免费理论片| 日本一区二区三区专线| 人妻体内射精一区二区三四| 国产最新进精品视频| 国产老妇伦国产熟女老妇高清| 国产成人精品无码专区| 尤物视频色版在线观看| 欧美老少配性行为| 成在人线av无码免费| 亚洲熟妇自偷自拍另欧美| 国产精品午夜福利免费看| 国产一区二区亚洲av| 少妇仑乱a毛片无码| 中文字幕一区二区三区久久蜜桃| 成人无码视频| 加勒比无码人妻东京热| 成人精品区| 国产成人无码免费视频在线| 泰顺县| 精品久久欧美熟妇www| 久久99精品国产99久久6尤物| 日韩精品一区二区三区蜜臀| 久久精品国产久精国产| 五月综合激情婷婷六月| 国产三级视频网站|