<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      flinksql API StreamTableEnvironment StreamStatementSet應用

      1.問題描述

      在應用flink實時消費kafka數據多端中,一般會使用flink原生的addsink或flinkSQL利用SqlDialect,比如消費kafka數據實時寫入hive和kafka一般用兩種方式:
      第一種方式是寫入hive利用SqlDialect,寫入kafka利用flink的旁路輸出流+原生addSink
      第二種方式是寫入hive和kafka都利用SqlDialect的方式,將kafka也當作一個劉表
      

      2.第一種方式核心代碼及現狀

      	DataStream<String> dataStream = environment.addSource(new FlinkKafkaConsumer(topic, new SimpleStringSchema(), props));
              SingleOutputStreamOperator<SipDataInfo> mainStream = dataStream.map(s -> {
                  SipDataInfo sipDataInfo = new SipDataInfo();
                  JSONObject jsonObject = SipFullauditMonitor.complex(s);
      
                  sipDataInfo.setRow(createRow(jsonObject, size, typeArray, column));
                  sipDataInfo.setJsonObject(jsonObject);
                  return sipDataInfo;
              });
      
              final OutputTag<SipDataInfo> kafkaOutputTag = new OutputTag<SipDataInfo>("kafka_stream") {
              };
              final OutputTag<SipDataInfo> hiveOutputTag = new OutputTag<SipDataInfo>("hive_stream") {
              };
      
              SingleOutputStreamOperator<SipDataInfo> sideOutputStream = mainStream.process(new ProcessFunction<SipDataInfo, SipDataInfo>() {
                  @Override
                  public void processElement(SipDataInfo sipDataInfo, Context context, Collector<SipDataInfo> collector) throws Exception {
                      context.output(kafkaOutputTag, sipDataInfo);
                      context.output(hiveOutputTag, sipDataInfo);
                  }
              });
      
              DataStream<SipDataInfo> kafkaStream = sideOutputStream.getSideOutput(kafkaOutputTag);
              DataStream<SipDataInfo> hiveStream = sideOutputStream.getSideOutput(hiveOutputTag);
      
              Properties producerProperties = new Properties();
              producerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ambari1:6667");
      
              kafkaStream.map(sipDataInfo -> sipDataInfo.getJsonObject().toJSONString())
                      .filter(s -> JSONObject.parseObject(s, SipFullauditMonitor.class).getReftaskid() != null && JSONObject.parseObject(s, SipFullauditMonitor.class).getReftaskid() == 0)
                      .addSink(new FlinkKafkaProducer<String>("dwd_" + topic, new SimpleStringSchema(), props));
      
      
              TypeInformation[] tfs = getSqlColumsType(typeArray);
              DataStream<Row> hiveOdsSinkDataStream = hiveStream.map(sipDataInfo -> sipDataInfo.getRow())
                      .returns(Types.ROW_NAMED(column, tfs))
                      .filter(row -> CommonUtil.filter(row));
      
              setHiveParam(parameter, tableEnv);
      
              Table table = tableEnv.fromDataStream(hiveOdsSinkDataStream);
              tableEnv.createTemporaryView("tmp_" + topic, table);
      
              tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
              tableEnv.executeSql(BaseStreamLaucher.parseCreateTableSqlByColumn("ods_" + topic,column, typeArray,new String[]{"pdate","insterhour"},new String[]{"string","string"}));
      
              //寫hive表
              tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
              String insertSql = "insert into ods_" + topic + " PARTITION(pdate='" +
                      new SimpleDateFormat("yyyy-MM-dd").format(new Date()) +
                      "') select " + sinkHiveColumnStr + " from tmp_" + topic;
      
              tableEnv.executeSql(insertSql);
              environment.execute();
          }
      }
      

      3.第二種方式實現的核心代碼

      		DataStream<String> dataStream = environment.addSource(new FlinkKafkaConsumer(topic, new SimpleStringSchema(), props));
      
              TypeInformation[] tfs = getSqlColumsType(typeArray);
              DataStream<Row> rowDataStream = dataStream.map(s -> createRow(SipFullauditMonitor.complex(s), size, typeArray, column))
                      .returns(Types.ROW_NAMED(column, tfs))
                      .filter(row -> CommonUtil.filter(row));
      
              Table table = tableEnv.fromDataStream(rowDataStream);
              setHiveParam(parameter, tableEnv);
              tableEnv.createTemporaryView("tmp_" + topic, table);
      
              //創建hive表
              tableEnv.executeSql(BaseStreamLaucher.parseCreateTableSqlByColumn("ods_" + topic,column, typeArray,new String[]{"pdate","insterhour"},new String[]{"string","string"}));
      
              tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
              //創建kafka表
              tableEnv.executeSql("drop table dwd_sip_fullaudit_monitor");
              String kafkaTableSql = createKafkaTableSqlByColumn("dwd_sip_fullaudit_monitor", parameter, column, typeArray);
              tableEnv.executeSql(kafkaTableSql);
      
              //寫hive表
              String insertHiveSql = "insert into ods_" + topic + " PARTITION(pdate='" +
                      new SimpleDateFormat("yyyy-MM-dd").format(new Date()) +
                      "',insterhour='" + new SimpleDateFormat("yyyyMMddHH").format(new Date()) + "') select " + sinkHiveColumnStr + " from tmp_" + topic;
            
              //寫kafka表
              String insertKafkaSql = "insert into dwd_sip_fullaudit_monitor" + " select " + sinkHiveColumnStr + " from " + "tmp_" + topic;
      
              tableEnv.executeSql(insertKafkaSql);
              tableEnv.executeSql(insertHiveSql);
              
      

      在以上兩種實現方式中,發現flink都會在yarn上啟動兩個應用,這兩個應用雖然都能將數據正常寫入hive和kafka,但是不太好。

      后面通過不斷的嘗試api發現StreamTableEnvironment StreamStatementSet可以解決該問題

      4.應用StreamTableEnvironment StreamStatementSet的核心代碼

      		DataStream<String> dataStream = environment.addSource(new FlinkKafkaConsumer(topic, new SimpleStringSchema(), props));
      
              TypeInformation[] tfs = getSqlColumsType(typeArray);
              DataStream<Row> rowDataStream = dataStream.map(s -> createRow(SipFullauditMonitor.complex(s), size, typeArray, column))
                      .returns(Types.ROW_NAMED(column, tfs))
                      .filter(row -> CommonUtil.filter(row));
      
              Table table = tableEnv.fromDataStream(rowDataStream);
              setHiveParam(parameter, tableEnv);
              tableEnv.createTemporaryView("tmp_" + topic, table);
      
              //創建hive表
              tableEnv.executeSql(BaseStreamLaucher.parseCreateTableSqlByColumn("ods_" + topic,column, typeArray,new String[]{"pdate","insterhour"},new String[]{"string","string"}));
      
              tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
              //創建kafka表
              tableEnv.executeSql("drop table dwd_sip_fullaudit_monitor");
              String kafkaTableSql = createKafkaTableSqlByColumn("dwd_sip_fullaudit_monitor", parameter, column, typeArray);
              tableEnv.executeSql(kafkaTableSql);
      
              StatementSet stmtSet = tableEnv.createStatementSet();
              //寫hive表
              String insertHiveSql = "insert into ods_" + topic + " PARTITION(pdate='" +
                      new SimpleDateFormat("yyyy-MM-dd").format(new Date()) +
                      "',insterhour='" + new SimpleDateFormat("yyyyMMddHH").format(new Date()) + "') select " + sinkHiveColumnStr + " from tmp_" + topic;
              System.out.println("insertHiveSql:"+insertHiveSql);
              //寫kafka表
              String insertKafkaSql = "insert into dwd_sip_fullaudit_monitor" + " select " + sinkHiveColumnStr + " from " + "tmp_" + topic;
      
              stmtSet.addInsertSql(insertHiveSql);
              stmtSet.addInsertSql(insertKafkaSql);
      
              stmtSet.execute();
      

      執行查看flink web界面

      說明:

      StreamStatementSet的這個的應用在初學或者一般場景應用下可能不太容易發現或應用,來看下flink源碼的解釋,紅色部分大概意思是[可以一起優化所有添加的語句,然后將它們作為一個作業提交],重點是作為一個作業提交。但StreamStatementSet并沒有解決前面的第一種場景。所以在實際的應用中不太建議流表和原生addsink混用,flink越往后的版本也是更加提倡應用流表方式去完成流批一體的體系
      

      posted @ 2024-06-21 15:44  技術即藝術  閱讀(247)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 99热久久这里只有精品| 久久96热在精品国产高清| 久久夜色精品久久噜噜亚| 久久天堂无码av网站| 中文字幕乱码中文乱码毛片 | 又污又爽又黄的网站| 不卡一区二区国产在线| 亚洲国产精品日韩在线| 77777亚洲午夜久久多人| 亚洲十八禁一区二区三区| 成年视频人免费网站动漫在线| 宜章县| 欧美寡妇xxxx黑人猛交| 亚洲福利精品一区二区三区| 国产精品美女乱子伦高| 男女啪啪网站| 欧美日韩国产一区二区三区欧| 国产精品久久国产精麻豆99网站| 内射干少妇亚洲69XXX| 高清不卡一区二区三区| 秋霞电影院午夜无码免费视频| 视频一区二区三区刚刚碰| 亚洲精品揄拍自拍首页一| 国产av仑乱内谢| 欧美日韩精品一区二区三区高清视频| 无码人妻精品一区二区三区66| 日韩黄色av一区二区三区| 亚洲精品网站在线观看不卡无广告 | 国产精品中文第一字幕| 国产免费高清69式视频在线观看| 国产精品午夜福利小视频| 年轻女教师hd中字3| 亚洲国产精品毛片av不卡在线| 国产一区二区日韩经典| 嘉义市| 男女猛烈激情xx00免费视频| 91产精品无码无套在线| 日本人妻巨大乳挤奶水免费 | 18禁网站免费无遮挡无码中文| 阜新| 日本一区二区不卡精品|