<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Flink SQL UNNEST/UDTF 如何實現列轉行?

      在 SQL 任務里面經常會遇到一列轉多行的需求,今天就來總結一下在 Flink SQL 里面如何實現列轉行的,先來看下面的一個具體案例.

       需求:

      原始數據格式如下

      name        data
      JasonLee    [{"content_type":"flink","url":"111"},{"content_type":"spark","url":"222"},{"content_type":"hadoop","url":"333"}]

      data 格式化

      {
       "name": "JasonLee",
       "data": [{
         "content_type": "flink",
         "url": "111"
        }, {
         "content_type": "spark",
         "url": "222"
        },
        {
         "content_type": "hadoop",
         "url": "333"
        }
       ]
      }

      現在希望得到的數據格式是這樣的:

      name    content_type    url
      JasonLee    flink   111
      JasonLee    spark   222
      JasonLee    hadoop  333

      這是一個典型的列轉行或者一行轉多行的場景,需要將 data 列進行拆分成為多行多列,下面介紹兩種實現方式.

      使用 Flink 自帶的 unnest 函數解析

      建表 DDL

      CREATE TABLE kafka_table (
      name string,
      `data` ARRAY<ROW<content_type STRING,url STRING>>
      )
      WITH (
          'connector' = 'kafka', -- 使用 kafka connector
          'topic' = 'test',
          'properties.bootstrap.servers' = 'master:9092,storm1:9092,storm2:9092',  -- broker連接信息
          'properties.group.id' = 'jason_flink_test', -- 消費kafka的group_id
          'scan.startup.mode' = 'latest-offset',  -- 讀取數據的位置
          'format' = 'json',  -- 數據源格式為 json
          'json.fail-on-missing-field' = 'false', -- 字段丟失任務不失敗
          'json.ignore-parse-errors' = 'true'  -- 解析失敗跳過
      )

      這里在定義 data 字段類型的時候需要定義為 ARRAY 類型,因為 unnest 函數需要一個數組類型的參數.

      unnest 解析

      select name,content_type,url
      from kafka_table CROSS JOIN UNNEST(`data`) AS t (content_type,url)
      
      select name,content_type,url
      from kafka_table, UNNEST(`data`) AS t (content_type,url)
      
      select name,content_type,url
      from kafka_table left join UNNEST(`data`) AS t (content_type,url) on true

      使用 自定義 UDTF 函數解析

      自定義表值函數(UDTF),自定義表值函數,將 0 個、1 個或多個標量值作為輸入參數(可以是變長參數)。與自定義的標量函數類似,但與標量函數不同。表值函數可以返回任意數量的行作為輸出,而不僅是 1 個值。返回的行可以由 1 個或多個列組成。調用一次函數輸出多行或多列數據。必須繼承 TableFunction 基類,并實現一個或者多個名為 eval 的方法, 在使用 UDTF 時,需要帶上 LATERAL TABLE兩個關鍵字.

      @FunctionHint(output = @DataTypeHint("ROW<content_type STRING,url STRING>"))
      public class ParserJsonArrayTest extends TableFunction<Row> {
          private static final Logger log = Logger.getLogger(ParserJsonArrayTest.class);
          public void eval(String value) {
              try {
                  JSONArray snapshots = JSONArray.parseArray(value);
                  Iterator<Object> iterator = snapshots.iterator();
                  while (iterator.hasNext()) {
                      JSONObject jsonObject = (JSONObject) iterator.next();
                      String content_type = jsonObject.getString("content_type");
                      String url = jsonObject.getString("url");
                      collect(Row.of(content_type,url));
                  }
              } catch (Exception e) {
                  log.error("parser json failed :" + e.getMessage());
              }
          }
      } 

      自定義 UDTF 解析的時候,就不需要把 data 字段定義成 ARRAY 類型了,直接定義成 STRING 類型就可以了,并且這種方式會更加的靈活,比如還需要過濾數據或者更復雜的一些操作時都可以在 UDTF 里面完成.

      Flink SQL 使用 UDTF

      select name,content_type,url
      from kafka_table CROSS JOIN lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url)
       
      
      select name,content_type,url
      from kafka_table, lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url)
       
      
      select name,content_type,url
      from kafka_table left join lateral TABLE (ParserJsonArrayTest(`data`)) AS t (content_type,url) on true

      注意:

      unnest 和 自定義 UDTF 函數在使用的時候都有 3 種寫法,前面兩種寫法的效果其實是一樣的,第三種寫法相當于 left join 的用法.區別在于 CROSS JOIN/INNER JOIN: 對于左側表的每一行,右側 UDTF 不輸出,則這一行不輸出.LEFT JOIN: 對于左側表的每一行,右側 UDTF 不輸出,則這一行會輸出,右側 UDTF 字段為 null。

      打印的結果

      2> JasonLee,flink,111
      2> JasonLee,spark,222
      2> JasonLee,hadoop,333

      總結

      在實際使用的時候如果 unnest 可以滿足需求就直接用 unnest 不需要帶來額外的開發,如果 unnest 函數滿足不了需求,那么就自定義 UDTF 去完成。

      posted @ 2022-11-01 09:17  大數據從業者FelixZh  閱讀(4337)  評論(0)    收藏  舉報
      大數據從業者
      主站蜘蛛池模板: 巨爆乳中文字幕爆乳区| 福利网午夜视频一区二区| 午夜欧美日韩在线视频播放 | 中文字幕国产精品综合| 国产精品毛片一区二区| 亚洲情A成黄在线观看动漫尤物 | 国产自拍在线一区二区三区| 亚洲电影天堂在线国语对白| 亚洲国产精品综合久久2007| japanese边做边乳喷| 色五月丁香六月欧美综合| 日韩精品自拍偷拍一区二区| 国产精品午夜福利91| 日韩精品人妻av一区二区三区| 人妻另类 专区 欧美 制服| 亚洲精品爆乳一区二区H| 久久精品国产国产精品四凭| 免费一本色道久久一区| 亚洲综合国产激情另类一区| 国产伦码精品一区二区| 日本一道一区二区视频| 四虎国产精品永久在线看| 狠狠躁夜夜躁人人爽天天bl| 制服丝袜中文字幕在线| 日韩精品 在线 国产 丝袜| 日本高清一区免费中文视频| 开心一区二区三区激情| 真实单亲乱l仑对白视频| 亚洲欧美色综合影院| 中文字幕久久六月色综合| 国产日韩精品欧美一区灰| 国产精品国产精品偷麻豆| 伊人成伊人成综合网222| 在线国产极品尤物你懂的| 狠狠色狠狠色综合日日不卡| 在线成人| 国产香蕉一区二区三区在线视频 | 国产日产欧产精品精品| 深夜释放自己在线观看| 夜夜春久久天堂亚洲精品| 久久精品熟女亚洲av艳妇|