<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      大數(shù)據(jù)課程之Flink

       

      大數(shù)據(jù)課程之Flink

      第一章 Flink簡(jiǎn)介

      1、初識(shí)Flink

       

      Apache Flink是一個(gè)框架和分布式處理引擎,用于對(duì)無(wú)界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計(jì)算。Flink被設(shè)計(jì)在所有常見(jiàn)的集群環(huán)境中運(yùn)行,以?xún)?nèi)存執(zhí)行速度和任意規(guī)模來(lái)執(zhí)行計(jì)算。

       

       

       

      Flink起源于Stratosphere項(xiàng)目,Stratosphere是在2010~2014年由3所地處柏林的大學(xué)和歐洲的一些其他的大學(xué)共同進(jìn)行的研究項(xiàng)目,2014年4月Stratosphere的代碼被復(fù)制并捐贈(zèng)給了Apache軟件基金會(huì),參加這個(gè)孵化項(xiàng)目的初始成員是Stratosphere系統(tǒng)的核心開(kāi)發(fā)人員,2014年12月,F(xiàn)link一躍成為Apache軟件基金會(huì)的頂級(jí)項(xiàng)目。

       

       

      在德語(yǔ)中,F(xiàn)link一詞表示快速和靈巧,項(xiàng)目采用一只松鼠的彩色圖案作為logo,這不僅是因?yàn)樗墒缶哂锌焖俸挽`巧的特點(diǎn),還因?yàn)榘亓值乃墒笥幸环N迷人的紅棕色,而Flink的松鼠logo擁有可愛(ài)的尾巴,尾巴的顏色與Apache軟件基金會(huì)的logo顏色相呼應(yīng),也就是說(shuō),這是一只Apache風(fēng)格的松鼠。

       

      圖 Flink Logo

       

       

      Flink雖然誕生的早(2010年),但是其實(shí)是起大早趕晚集,直到2015年才開(kāi)始突然爆發(fā)熱度。

      在Flink被apache提升為頂級(jí)項(xiàng)目之后,阿里實(shí)時(shí)計(jì)算團(tuán)隊(duì)決定在阿里內(nèi)部建立一個(gè) Flink 分支 Blink,并對(duì) Flink 進(jìn)行大量的修改和完善,讓其適應(yīng)阿里巴巴這種超大規(guī)模的業(yè)務(wù)場(chǎng)景。

      Blink由2016年上線,服務(wù)于阿里集團(tuán)內(nèi)部搜索、推薦、廣告和螞蟻等大量核心實(shí)時(shí)業(yè)務(wù)。與2019年1月Blink正式開(kāi)源,目前阿里70%的技術(shù)部門(mén)都有使用該版本。

      Blink比起Flink的優(yōu)勢(shì)就是對(duì)SQL語(yǔ)法的更完善的支持以及執(zhí)行SQL的性能提升。

       

       

       

       

       

       

       

       

       

      2  Flink的重要特點(diǎn)

       

      2.1  事件驅(qū)動(dòng)型(Event-driven)

      事件驅(qū)動(dòng)型應(yīng)用是一類(lèi)具有狀態(tài)的應(yīng)用,它從一個(gè)或多個(gè)事件流提取數(shù)據(jù),并根據(jù)到來(lái)的事件觸發(fā)計(jì)算、狀態(tài)更新或其他外部動(dòng)作。比較典型的就是以kafka為代表的消息隊(duì)列幾乎都是事件驅(qū)動(dòng)型應(yīng)用。

       

      與之不同的就是SparkStreaming微批次,如圖:

       

         事件驅(qū)動(dòng)型:

       

       

       

      2.2 流與批的世界觀

           批處理的特點(diǎn)是有界、持久、大量,非常適合需要訪問(wèn)全套記錄才能完成的計(jì)算工作,一般用于離線統(tǒng)計(jì)。

      流處理的特點(diǎn)是無(wú)界、實(shí)時(shí),  無(wú)需針對(duì)整個(gè)數(shù)據(jù)集執(zhí)行操作,而是對(duì)通過(guò)系統(tǒng)傳輸?shù)拿總€(gè)數(shù)據(jù)項(xiàng)執(zhí)行操作,一般用于實(shí)時(shí)統(tǒng)計(jì)。

       

         在spark的世界觀中,一切都是由批次組成的,離線數(shù)據(jù)是一個(gè)大批次,而實(shí)時(shí)數(shù)據(jù)是由一個(gè)一個(gè)無(wú)限的小批次組成的。

         而在flink的世界觀中,一切都是由流組成的,離線數(shù)據(jù)是有界限的流,實(shí)時(shí)數(shù)據(jù)是一個(gè)沒(méi)有界限的流,這就是所謂的有界流和無(wú)界流。

       

      無(wú)界數(shù)據(jù)流無(wú)界數(shù)據(jù)流有一個(gè)開(kāi)始但是沒(méi)有結(jié)束,它們不會(huì)在生成時(shí)終止并提供數(shù)據(jù),必須連續(xù)處理無(wú)界流,也就是說(shuō)必須在獲取后立即處理event。對(duì)于無(wú)界數(shù)據(jù)流我們無(wú)法等待所有數(shù)據(jù)都到達(dá),因?yàn)檩斎胧菬o(wú)界的,并且在任何時(shí)間點(diǎn)都不會(huì)完成。處理無(wú)界數(shù)據(jù)通常要求以特定順序(例如事件發(fā)生的順序)獲取event,以便能夠推斷結(jié)果完整性。

      有界數(shù)據(jù)流有界數(shù)據(jù)流有明確定義的開(kāi)始和結(jié)束,可以在執(zhí)行任何計(jì)算之前通過(guò)獲取所有數(shù)據(jù)來(lái)處理有界流,處理有界流不需要有序獲取,因?yàn)榭梢允冀K對(duì)有界數(shù)據(jù)集進(jìn)行排序,有界流的處理也稱(chēng)為批處理。

          這種以流為世界觀的架構(gòu),獲得的最大好處就是具有極低的延遲。

       

      2.3 分層api

       

       

      最底層級(jí)的抽象僅僅提供了有狀態(tài)流,它將通過(guò)過(guò)程函數(shù)(Process Function)被嵌入到DataStream API中。底層過(guò)程函數(shù)(Process Function) 與 DataStream API 相集成,使其可以對(duì)某些特定的操作進(jìn)行底層的抽象,它允許用戶(hù)可以自由地處理來(lái)自一個(gè)或多個(gè)數(shù)據(jù)流的事件,并使用一致的容錯(cuò)的狀態(tài)。除此之外,用戶(hù)可以注冊(cè)事件時(shí)間并處理時(shí)間回調(diào),從而使程序可以處理復(fù)雜的計(jì)算。

      實(shí)際上,大多數(shù)應(yīng)用并不需要上述的底層抽象,而是針對(duì)核心API(Core APIs) 進(jìn)行編程,比如DataStream API(有界或無(wú)界流數(shù)據(jù))以及DataSet API(有界數(shù)據(jù)集)。這些API為數(shù)據(jù)處理提供了通用的構(gòu)建模塊,比如由用戶(hù)定義的多種形式的轉(zhuǎn)換(transformations),連接(joins),聚合(aggregations),窗口操作(windows)等等。DataSet API 為有界數(shù)據(jù)集提供了額外的支持,例如循環(huán)與迭代。這些API處理的數(shù)據(jù)類(lèi)型以類(lèi)(classes)的形式由各自的編程語(yǔ)言所表示。

      Table API 是以表為中心的聲明式編程,其中表可能會(huì)動(dòng)態(tài)變化(在表達(dá)流數(shù)據(jù)時(shí))。Table API遵循(擴(kuò)展的)關(guān)系模型:表有二維數(shù)據(jù)結(jié)構(gòu)(schema)(類(lèi)似于關(guān)系數(shù)據(jù)庫(kù)中的表),同時(shí)API提供可比較的操作,例如select、project、join、group-by、aggregate等。Table API程序聲明式地定義了什么邏輯操作應(yīng)該執(zhí)行,而不是準(zhǔn)確地確定這些操作代碼的看上去如何 。 盡管Table API可以通過(guò)多種類(lèi)型的用戶(hù)自定義函數(shù)(UDF)進(jìn)行擴(kuò)展,其仍不如核心API更具表達(dá)能力,但是使用起來(lái)卻更加簡(jiǎn)潔(代碼量更少)。除此之外,Table API程序在執(zhí)行之前會(huì)經(jīng)過(guò)內(nèi)置優(yōu)化器進(jìn)行優(yōu)化。

      你可以在表與 DataStream/DataSet 之間無(wú)縫切換,以允許程序?qū)?nbsp;Table API 與 DataStream 以及 DataSet 混合使用。

      Flink提供的最高層級(jí)的抽象是 SQL 。這一層抽象在語(yǔ)法與表達(dá)能力上與 Table API 類(lèi)似,但是是以SQL查詢(xún)表達(dá)式的形式表現(xiàn)程序。SQL抽象與Table API交互密切,同時(shí)SQL查詢(xún)可以直接在Table API定義的表上執(zhí)行。

      2.4  支持有狀態(tài)計(jì)算

      Flink在1.4版本中實(shí)現(xiàn)了狀態(tài)管理,所謂狀態(tài)管理就是在流失計(jì)算過(guò)程中將算子的中間結(jié)果保存在內(nèi)存或者文件系統(tǒng)中,等下一個(gè)事件進(jìn)入算子后可以讓當(dāng)前事件的值與歷史值進(jìn)行匯總累計(jì)。

       

      2.5  支持exactly-once語(yǔ)義

       

      在分布式系統(tǒng)中,組成系統(tǒng)的各個(gè)計(jì)算機(jī)是獨(dú)立的。這些計(jì)算機(jī)有可能fail。

      一個(gè)sender發(fā)送一條message到receiver。根據(jù)receiver出現(xiàn)fail時(shí)sender如何處理fail,可以將message delivery分為三種語(yǔ)義:

       

      At Most once: 對(duì)于一條message,receiver最多收到一次(0次或1次).

      可以達(dá)成At Most Once的策略:

      sender把message發(fā)送給receiver.無(wú)論receiver是否收到message,sender都不再重發(fā)message.

       

      At Least once: 對(duì)于一條message,receiver最少收到一次(1次及以上).

      可以達(dá)成At Least Once的策略:

      sender把message發(fā)送給receiver.當(dāng)receiver在規(guī)定時(shí)間內(nèi)沒(méi)有回復(fù)ACK或回復(fù)了error信息,那么sender重發(fā)這條message給receiver,直到sender收到receiver的ACK.

       

      Exactly once: 對(duì)于一條message,receiver確保只收到一次

       

       

      2.6   支持事件時(shí)間(EventTime

          目前大多數(shù)框架時(shí)間窗口計(jì)算,都是采用當(dāng)前系統(tǒng)時(shí)間,以時(shí)間為單位進(jìn)行的聚合計(jì)算只能反應(yīng)數(shù)據(jù)到達(dá)計(jì)算引擎的時(shí)間,而并不是實(shí)際業(yè)務(wù)時(shí)間

       

       

       

       

       

      第二章 快速上手

      1  搭建maven工程 flink-2019

      1.1、pom文件

       

      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>

          <groupId>com.atguigu.flink</groupId>
          <artifactId>flink</artifactId>
          <version>1.0-SNAPSHOT</version>

          <dependencies>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-scala_2.11</artifactId>
                  <version>1.7.0</version>
              </dependency>

              <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-scala_2.11</artifactId>
                  <version>1.7.0</version>
              </dependency>
          </dependencies>

       

      <build>
          <plugins>
          <!-- 該插件用于將Scala代碼編譯成class文件 -->
          <plugin>
              <groupId>net.alchim31.maven</groupId>
              <artifactId>scala-maven-plugin</artifactId>
              <version>3.4.6</version>
              <executions>
                  <execution>
                      <!-- 聲明綁定到maven的compile階段 -->
                      <goals>
                          <goal>compile</goal>
                          <goal>testCompile</goal>
                      </goals>
                  </execution>
              </executions>
          </plugin>

              <plugin>
                  <groupId>org.apache.maven.plugins</groupId>
                  <artifactId>maven-assembly-plugin</artifactId>
                  <version>3.0.0</version>
                  <configuration>
                      <descriptorRefs>
                          <descriptorRef>jar-with-dependencies</descriptorRef>
                      </descriptorRefs>
                  </configuration>
                  <executions>
                      <execution>
                          <id>make-assembly</id>
                          <phase>package</phase>
                          <goals>
                              <goal>single</goal>
                          </goals>
                      </execution>
                  </executions>
              </plugin>
          </plugins>
      </build>


      </project>

       

       

      1.2 添加scala框架 和 scala文件夾

       

      2 批處理wordcount

      import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
      
      object WordCountBeach {
          def main(args: Array[String]): Unit = {
              val env = ExecutionEnvironment.getExecutionEnvironment
              val input = "F:\\input\\words.txt"
              val ds: DataSet[String] = env.readTextFile(input)
              import org.apache.flink.api.scala._
              val aggDs = ds.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
              aggDs.print()
          }
      
      }
      

        

       

      def main(args: Array[String]): Unit = {

        //構(gòu)造執(zhí)行環(huán)境
        val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
        //讀取文件
        val input = "file:///d:/temp/hello.txt"
        val ds: DataSet[String] = env.readTextFile(input)
        // 其中flatMap 和Map 中  需要引入隱式轉(zhuǎn)換
        import org.apache.flink.api.scala.createTypeInformation
        //經(jīng)過(guò)groupby進(jìn)行分組,sum進(jìn)行聚合
        val aggDs: AggregateDataSet[(String, Int)] = ds.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
        // 打印
        aggDs.print()

      }

       

       

      注意:Flink程序支持java 和 scala兩種語(yǔ)言,本課程中以scala語(yǔ)言為主。

      在引入包中,有java和scala兩種包時(shí)注意要使用scala的包

       

       

       

      3  流處理 wordcount

       

       

      import org.apache.flink.api.java.utils.ParameterTool
      import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

      object StreamWcApp {

        def main(args: Array[String]): Unit = {
          //從外部命令中獲取參數(shù)
          val tool: ParameterTool = ParameterTool.fromArgs(args)
          val host: String = tool.get("host")
          val port: Int = tool.get("port").toInt

          //創(chuàng)建流處理環(huán)境
          val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
          //接收socket文本流
          val textDstream: DataStream[String] = env.socketTextStream(host,port)
         // flatMap和Map需要引用的隱式轉(zhuǎn)換
          import org.apache.flink.api.scala._
         //處理 分組并且sum聚合
          val dStream: DataStream[(String, Int)] = textDstream.flatMap(_.split(" ")).filter(_.nonEmpty).map((_,1)).keyBy(0).sum(1)
         //打印
          dStream.print()
          
          env.execute()
        }

       

       

       

       

      測(cè)試

      在linux系統(tǒng)中用

       nc -lk  7777

      進(jìn)行發(fā)送測(cè)試

       

       

      第三章 Flink部署

       

      1 standalone模式

      1.1  安裝

      解壓縮  flink-1.7.0-bin-hadoop27-scala_2.11.tgz

       

      修改 flink/conf/flink-conf.yaml 文件

       

       注意:yaml文件必須是冒號(hào)+空格

      修改 /conf/slave文件

       

       

      .分發(fā)給 另外兩臺(tái)機(jī)子

       

      啟動(dòng)

       

       

       

      訪問(wèn)http://hadoop1:8081

       

       

      1.2 提交任務(wù)

       

      1準(zhǔn)備數(shù)據(jù)文件

       

       

      2把含數(shù)據(jù)文件的文件夾,分發(fā)到taskmanage 機(jī)器中

       

       

      由于讀取數(shù)據(jù)是從本地磁盤(pán)讀取,實(shí)際任務(wù)會(huì)被分發(fā)到taskmanage的機(jī)器中,所以要把目標(biāo)文件分發(fā)。

       

      3) 執(zhí)行程序 

      ./flink run -c com.atguigu.flink.app.BatchWcApp  /ext/flink0503-1.0-SNAPSHOT.jar  --input /applog/flink/input.txt --output /applog/flink/output.csv

       

       

       

       

      4到目標(biāo)文件夾中查看計(jì)算結(jié)果

      注意:計(jì)算結(jié)果根據(jù)會(huì)保存到taskmanage的機(jī)器下,不會(huì)再jobmanage下。

       

       

       

       

      5在webui控制臺(tái)查看計(jì)算過(guò)程

       

       

       

      2  yarn模式

       

      1) 啟動(dòng)hadoop集群

       

      2) 啟動(dòng)yarn-session

       

       

      ./yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d

      其中:

      -n(--container):TaskManager的數(shù)量。

      -s(--slots): 每個(gè)TaskManager的slot數(shù)量,默認(rèn)一個(gè)slot一個(gè)core,默認(rèn)每個(gè)taskmanager的slot的個(gè)數(shù)為1,有時(shí)可以多一些taskmanager,做冗余。

      -jm:JobManager的內(nèi)存(單位MB)。

      -tm:每個(gè)taskmanager的內(nèi)存(單位MB)。

      -nm:yarn 的appName(現(xiàn)在yarn的ui上的名字)。

      -d:后臺(tái)執(zhí)行。

       

       

       

       

      3) 執(zhí)行任務(wù)

      ./flink run  -m yarn-cluster -c com.atguigu.flink.app.BatchWcApp  /ext/flink0503-1.0-SNAPSHOT.jar  --input /applog/flink/input.txt --output /applog/flink/output5.csv

       

       

       

       

      4) 去yarn控制臺(tái)查看任務(wù)狀態(tài)

       

       

       

       

       

       

       

       

       

       

      第四章 Flink運(yùn)行架構(gòu)

      任務(wù)提交流程(yarn模式)

       

      圖 Yarn模式任務(wù)提交流程

      Flink任務(wù)提交后,Client向HDFS上傳Flink的Jar包和配置,之后向Yarn ResourceManager提交任務(wù),ResourceManager分配Container資源并通知對(duì)應(yīng)的NodeManager啟動(dòng)ApplicationMaster,ApplicationMaster啟動(dòng)后加載Flink的Jar包和配置構(gòu)建環(huán)境,然后啟動(dòng)JobManager,之后ApplicationMaster向ResourceManager申請(qǐng)資源啟動(dòng)TaskManager,ResourceManager分配Container資源后,由ApplicationMaster通知資源所在節(jié)點(diǎn)的NodeManager啟動(dòng)TaskManager,NodeManager加載Flink的Jar包和配置構(gòu)建環(huán)境并啟動(dòng)TaskManager,TaskManager啟動(dòng)后向JobManager發(fā)送心跳包,并等待JobManager向其分配任務(wù)。

      任務(wù)調(diào)度原理

      圖 任務(wù)調(diào)度原理

      客戶(hù)端不是運(yùn)行時(shí)和程序執(zhí)行的一部分,但它用于準(zhǔn)備并發(fā)送dataflow(JobGraph)給Master(JobManager),然后,客戶(hù)端斷開(kāi)連接或者維持連接以等待接收計(jì)算結(jié)果。

       

      當(dāng) Flink 集群?jiǎn)?dòng)后,首先會(huì)啟動(dòng)一個(gè) JobManger 和一個(gè)或多個(gè)的 TaskManager。由 Client 提交任務(wù)給 JobManager,JobManager 再調(diào)度任務(wù)到各個(gè) TaskManager 去執(zhí)行,然后 TaskManager 將心跳和統(tǒng)計(jì)信息匯報(bào)給 JobManager。TaskManager 之間以流的形式進(jìn)行數(shù)據(jù)的傳輸。上述三者均為獨(dú)立的 JVM 進(jìn)程。

      Client 為提交 Job 的客戶(hù)端,可以是運(yùn)行在任何機(jī)器上(與 JobManager 環(huán)境連通即可)。提交 Job 后,Client 可以結(jié)束進(jìn)程(Streaming的任務(wù)),也可以不結(jié)束并等待結(jié)果返回。

      JobManager 主要負(fù)責(zé)調(diào)度 Job 并協(xié)調(diào) Task 做 checkpoint,職責(zé)上很像 Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包等資源后,會(huì)生成優(yōu)化后的執(zhí)行計(jì)劃,并以 Task 的單元調(diào)度到各個(gè) TaskManager 去執(zhí)行。

      TaskManager 在啟動(dòng)的時(shí)候就設(shè)置好了槽位數(shù)(Slot),每個(gè) slot 能啟動(dòng)一個(gè) Task,Task 為線程。從 JobManager 處接收需要部署的 Task,部署啟動(dòng)后,與自己的上游建立 Netty 連接,接收數(shù)據(jù)并處理。

       

       

      關(guān)于執(zhí)行圖

      Flink 中的執(zhí)行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執(zhí)行圖

      StreamGraph:是根據(jù)用戶(hù)通過(guò) Stream API 編寫(xiě)的代碼生成的最初的圖。用來(lái)表示程序的拓?fù)浣Y(jié)構(gòu)。

      JobGraph:StreamGraph經(jīng)過(guò)優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個(gè)符合條件的節(jié)點(diǎn) chain 在一起作為一個(gè)節(jié)點(diǎn),這樣可以減少數(shù)據(jù)在節(jié)點(diǎn)之間流動(dòng)所需要的序列化/反序列化/傳輸消耗。

      ExecutionGraph:JobManager 根據(jù) JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。

      物理執(zhí)行圖:JobManager 根據(jù) ExecutionGraph 對(duì) Job 進(jìn)行調(diào)度后,在各個(gè)TaskManager 上部署 Task 后形成的“圖”,并不是一個(gè)具體的數(shù)據(jù)結(jié)構(gòu)。

      3 Worker與Slots

      每一個(gè)worker(TaskManager)是一個(gè)JVM進(jìn)程,它可能會(huì)在獨(dú)立的線程上執(zhí)行一個(gè)或多個(gè)subtask。為了控制一個(gè)worker能接收多少個(gè)task,worker通過(guò)task slot來(lái)進(jìn)行控制(一個(gè)worker至少有一個(gè)task slot)?!?/p>

      每個(gè)task slot表示TaskManager擁有資源的一個(gè)固定大小的子集。假如一個(gè)TaskManager有三個(gè)slot,那么它會(huì)將其管理的內(nèi)存分成三份給各個(gè)slot。資源slot化意味著一個(gè)subtask將不需要跟來(lái)自其他job的subtask競(jìng)爭(zhēng)被管理的內(nèi)存,取而代之的是它將擁有一定數(shù)量的內(nèi)存儲(chǔ)備。需要注意的是,這里不會(huì)涉及到CPU的隔離,slot目前僅僅用來(lái)隔離task的受管理的內(nèi)存。

      通過(guò)調(diào)整task slot的數(shù)量,允許用戶(hù)定義subtask之間如何互相隔離。如果一個(gè)TaskManager一個(gè)slot,那將意味著每個(gè)task group運(yùn)行在獨(dú)立的JVM中(該JVM可能是通過(guò)一個(gè)特定的容器啟動(dòng)的),而一個(gè)TaskManager多個(gè)slot意味著更多的subtask可以共享同一個(gè)JVM。而在同一個(gè)JVM進(jìn)程中的task將共享TCP連接(基于多路復(fù)用)和心跳消息。它們也可能共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu),因此這減少了每個(gè)task的負(fù)載。

      圖 TaskManager與Slot

      Task Slot是靜態(tài)的概念,是指TaskManager具有的并發(fā)執(zhí)行能力,可以通過(guò)參數(shù)taskmanager.numberOfTaskSlots進(jìn)行配置,而并行度parallelism是動(dòng)態(tài)概念,即TaskManager運(yùn)行程序時(shí)實(shí)際使用的并發(fā)能力,可以通過(guò)參數(shù)parallelism.default進(jìn)行配置。

      也就是說(shuō),假設(shè)一共有3個(gè)TaskManager,每一個(gè)TaskManager中的分配3個(gè)TaskSlot,也就是每個(gè)TaskManager可以接收3個(gè)task,一共9個(gè)TaskSlot,如果我們?cè)O(shè)置parallelism.default=1,即運(yùn)行程序默認(rèn)的并行度為1,9個(gè)TaskSlot只用了1個(gè),有8個(gè)空閑,因此,設(shè)置合適的并行度才能提高效率。

       

       

       

      并行數(shù)據(jù)流

      Flink程序的執(zhí)行具有并行、分布式的特性。在執(zhí)行過(guò)程中,一個(gè) stream 包含一個(gè)或多個(gè) stream partition ,而每一個(gè) operator 包含一個(gè)或多個(gè) operator subtask,這些operator subtasks在不同的線程、不同的物理機(jī)或不同的容器中彼此互不依賴(lài)得執(zhí)行。

      一個(gè)特定operator的subtask的個(gè)數(shù)被稱(chēng)之為其parallelism(并行度)。一個(gè)stream的并行度總是等同于其producing operator的并行度。一個(gè)程序中,不同的operator可能具有不同的并行度。

      圖 并行數(shù)據(jù)流

      Stream在operator之間傳輸數(shù)據(jù)的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具體是哪一種形式,取決于operator的種類(lèi)。

      One-to-onestream(比如在source和map operator之間)維護(hù)著分區(qū)以及元素的順序。那意味著map operator的subtask看到的元素的個(gè)數(shù)以及順序跟source operator的subtask生產(chǎn)的元素的個(gè)數(shù)、順序相同,map、fliter、flatMap等算子都是one-to-one的對(duì)應(yīng)關(guān)系。

      類(lèi)似于spark中的窄依賴(lài)

      Redistributingstream(map()跟keyBy/window之間或者keyBy/window跟sink之間)的分區(qū)會(huì)發(fā)生改變。每一個(gè)operator subtask依據(jù)所選擇的transformation發(fā)送數(shù)據(jù)到不同的目標(biāo)subtask。例如,keyBy() 基于hashCode重分區(qū)、broadcast和rebalance會(huì)隨機(jī)重新分區(qū),這些算子都會(huì)引起redistribute過(guò)程,而redistribute過(guò)程就類(lèi)似于Spark中的shuffle過(guò)程。

      類(lèi)似于spark中的寬依賴(lài)

      task與operator chains

      相同并行度的one to one操作,F(xiàn)link這樣相連的operator 鏈接在一起形成一個(gè)task,原來(lái)的operator成為里面的subtask。將operators鏈接成task是非常有效的優(yōu)化:它能減少線程之間的切換和基于緩存區(qū)的數(shù)據(jù)交換,在減少時(shí)延的同時(shí)提升吞吐量。鏈接的行為可以在編程API中進(jìn)行指定。

       

       

      圖 task與operator chains

       

       

      OperatorChain的優(yōu)點(diǎn)

      ? 減少線程切換

      ? 減少序列化與反序列化

      ? 減少延遲并且提高吞吐能力

       

       

      ? OperatorChain 組成條件(重要)

      ? 上下游算子并行度一致

      ? 上下游算子之間沒(méi)有數(shù)據(jù)shuffle

       

       

       

       

       

       

       

       

       

      第五章 Flink 流處理Api

       

      Environment

      getExecutionEnvironment

      創(chuàng)建一個(gè)執(zhí)行環(huán)境,表示當(dāng)前執(zhí)行程序的上下文。 如果程序是獨(dú)立調(diào)用的,則此方法返回本地執(zhí)行環(huán)境;如果從命令行客戶(hù)端調(diào)用程序以提交到集群,則此方法返回此集群的執(zhí)行環(huán)境,也就是說(shuō),getExecutionEnvironment會(huì)根據(jù)查詢(xún)運(yùn)行的方式?jīng)Q定返回什么樣的運(yùn)行環(huán)境,是最常用的一種創(chuàng)建執(zhí)行環(huán)境的方式。

       

      val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

       

      如果沒(méi)有設(shè)置并行度,會(huì)以flink-conf.yaml中的配置為準(zhǔn),默認(rèn)是1

       

      createLocalEnvironment

      返回本地執(zhí)行環(huán)境,需要在調(diào)用時(shí)指定默認(rèn)的并行度。

      val env = StreamExecutionEnvironment.createLocalEnvironment(1)

       

      createRemoteEnvironment

      返回集群執(zhí)行環(huán)境,將Jar提交到遠(yuǎn)程服務(wù)器。需要在調(diào)用時(shí)指定JobManager的IP和端口號(hào),并指定要在集群中運(yùn)行的Jar包。

      val env = ExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123,"C://jar//flink//wordcount.jar")

       

      Source

      創(chuàng)建kafka工具類(lèi)

      object MyKafkaUtil {

        val prop new Properties()

        prop.setProperty("bootstrap.servers","hadoop1:9092")
        prop.setProperty("group.id","gmall")

        def getConsumer(topic:String ):FlinkKafkaConsumer011[String]= {
            val myKafkaConsumer:FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), prop)
           myKafkaConsumer
        }


      }

       

       

       

      增加業(yè)務(wù)主類(lèi) StartupApp

      object StartupApp {

      def main(args: Array[String]): Unit = {
                val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

               val kafkaConsumer  =MyKafkaUtil.getConsumer("GMALL_STARTUP")

               val dstream: DataStream[String] = environment.addSource(kafkaConsumer)

                dstream.print()

                environment.execute()
      }

      }

       

       

       

      Flink+kafka是如何實(shí)現(xiàn)exactly-once語(yǔ)義的

       

      Flink通過(guò)checkpoint來(lái)保存數(shù)據(jù)是否處理完成的狀態(tài)

          

      由JobManager協(xié)調(diào)各個(gè)TaskManager進(jìn)行checkpoint存儲(chǔ),checkpoint保存在 StateBackend中,默認(rèn)StateBackend是內(nèi)存級(jí)的,也可以改為文件級(jí)的進(jìn)行持久化保存。

      執(zhí)行過(guò)程實(shí)際上是一個(gè)兩段式提交,每個(gè)算子執(zhí)行完成,會(huì)進(jìn)行“預(yù)提交”,直到執(zhí)行完sink操作,會(huì)發(fā)起“確認(rèn)提交”,如果執(zhí)行失敗,預(yù)提交會(huì)放棄掉。

      如果宕機(jī)需要通過(guò)StateBackend進(jìn)行恢復(fù),只能恢復(fù)所有確認(rèn)提交的操作。

       

       

       

       

       

       

       

       

       

       

       

       

      Transform

      轉(zhuǎn)換算子

       

       

      3.1 map

      val streamMap = stream.map { x => x * 2 }

       

       

      3.2 flatMap   

      val streamFlatMap = stream.flatMap{

          x => x.split(" ")

      }

       

       

      3.3 Filter

       

      val streamFilter = stream.filter{

          x => x == 1

      }

       

       

       

      3.4 KeyBy

      DataStream → KeyedStream:輸入必須是Tuple類(lèi)型,邏輯地將一個(gè)流拆分成不相交的分區(qū),每個(gè)分區(qū)包含具有相同key的元素,在內(nèi)部以hash的形式實(shí)現(xiàn)的。

      3.5 Reduce

      KeyedStream → DataStream:一個(gè)分組數(shù)據(jù)流的聚合操作,合并當(dāng)前的元素和上次聚合的結(jié)果,產(chǎn)生一個(gè)新的值,返回的流中包含每一次聚合的結(jié)果,而不是只返回最后一次聚合的最終結(jié)果。

       

      //求各個(gè)渠道的累計(jì)個(gè)數(shù)
      val startUplogDstream: DataStream[StartUpLog] = dstream.map{ JSON.parseObject(_,classOf[StartUpLog])}
      val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
      //reduce //sum
      keyedStream.reduce{  (ch1,ch2)=>
        (ch1._1,ch1._2+ch2._2)
      } .print().setParallelism(1)

       

       

       

       

      flink是如何保存累計(jì)值的,

      flink是一種有狀態(tài)的流計(jì)算框架,其中說(shuō)的狀態(tài)包括兩個(gè)層面:

      1) operator state 主要是保存數(shù)據(jù)在流程中的處理狀態(tài),用于確保語(yǔ)義的exactly-once。

      2) keyed state  主要是保存數(shù)據(jù)在計(jì)算過(guò)程中的累計(jì)值。

       

      這兩種狀態(tài)都是通過(guò)checkpoint機(jī)制保存在StateBackend中,StateBackend可以選擇保存在內(nèi)存中(默認(rèn)使用)或者保存在磁盤(pán)文件中。

      3.6 Split 和 Select

      Split

       

      圖 Split

      DataStream → SplitStream:根據(jù)某些特征把一個(gè)DataStream拆分成兩個(gè)或者多個(gè)DataStream。

      Select

       

      圖 Select

      SplitStream→DataStream:從一個(gè)SplitStream中獲取一個(gè)或者多個(gè)DataStream。

       

       

       

      需求:把a(bǔ)ppstore和其他的渠道的數(shù)據(jù)單獨(dú)拆分出來(lái),做成兩個(gè)流

       

           // 將appstore與其他渠道拆分拆分出來(lái)  成為兩個(gè)獨(dú)立的流
      val splitStream: SplitStream[StartUpLog] = startUplogDstream.split { startUplog =>
        var flags:List[String] =  null
        if ("appstore" == startUplog.ch) {
          flags = List(startUplog.ch)
        } else {
          flags = List("other" )
        }
        flags
      }
      val appStoreStream: DataStream[StartUpLog] = splitStream.select("appstore")
      appStoreStream.print("apple:").setParallelism(1)
      val otherStream: DataStream[StartUpLog] = splitStream.select("other")
      otherStream.print("other:").setParallelism(1)

       

       

       

       

       

       

      3.7 Connect和 CoMap

       

      圖 Connect算子

      DataStream,DataStream → ConnectedStreams:連接兩個(gè)保持他們類(lèi)型的數(shù)據(jù)流,兩個(gè)數(shù)據(jù)流被Connect之后,只是被放在了一個(gè)同一個(gè)流中,內(nèi)部依然保持各自的數(shù)據(jù)和形式不發(fā)生任何變化,兩個(gè)流相互獨(dú)立。

       CoMap,CoFlatMap

       

      圖 CoMap/CoFlatMap

      ConnectedStreams → DataStream:作用于ConnectedStreams上,功能與map和flatMap一樣,對(duì)ConnectedStreams中的每一個(gè)Stream分別進(jìn)行map和flatMap處理。

       

      //合并以后打印
      val connStream: ConnectedStreams[StartUpLog, StartUpLog] = appStoreStream.connect(otherStream)
      val allStream: DataStream[String] = connStream.map(
        (log1: StartUpLog) => log1.ch,
        (log2: StartUpLog) => log2.ch
      )
      allStream.print("connect::")

       

       

       

      3.8 Union

       

      圖 Union

      DataStream → DataStream:對(duì)兩個(gè)或者兩個(gè)以上的DataStream進(jìn)行union操作,產(chǎn)生一個(gè)包含所有DataStream元素的新DataStream。注意:如果你將一個(gè)DataStream跟它自己做union操作,在新的DataStream中,你將看到每一個(gè)元素都出現(xiàn)兩次。

       

      //合并以后打印
      val unionStream: DataStream[StartUpLog] = appStoreStream.union(otherStream)
      unionStream.print("union:::")

       

       

       

      Connect與 Union 區(qū)別:

      1 、 Union之前兩個(gè)流的類(lèi)型必須是一樣,Connect可以不一樣,在之后的coMap中再去調(diào)整成為一樣的。

      2 Connect只能操作兩個(gè)流,Union可以操作多個(gè)

       

       

      4  Sink

         Flink沒(méi)有類(lèi)似于spark中foreach方法,讓用戶(hù)進(jìn)行迭代的操作。雖有對(duì)外的輸出操作都要利用Sink完成。最后通過(guò)類(lèi)似如下方式完成整個(gè)任務(wù)最終輸出操作。

         myDstream.addSink(new MySink(xxxx))

       

       

       

        官方提供了一部分的框架的sink。除此以外,需要用戶(hù)自定義實(shí)現(xiàn)sink。   

       

      4.1 Kafka

      pom.xml

      <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
          <version>1.7.0</version>
      </dependency>

       

       

      mykafkaUtil中增加方法

      def getProducer(topic:String): FlinkKafkaProducer011[String] ={
        new FlinkKafkaProducer011[String](brokerList,topic,new SimpleStringSchema())
      }

       

       

      主函數(shù)中添加sink

      val myKafkaProducer: FlinkKafkaProducer011[String] = MyKafkaUtil.getProducer("channel_sum")

       

      sumDstream.map( chCount=>chCount._1+":"+chCount._2 ).addSink(myKafkaProducer)

       

       

       

      4.2 Redis

      pom.xml

      <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
      <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>flink-connector-redis_2.11</artifactId>
          <version>1.0</version>
      </dependency>

       

       

       

       

      object MyRedisUtil {

       

        val conf new FlinkJedisPoolConfig.Builder().setHost("hadoop1").setPort(6379).build()

        def getRedisSink(): RedisSink[(String,String)] ={
          new RedisSink[(String,String)](conf,new MyRedisMapper)
        }

        class MyRedisMapper extends RedisMapper[(String,String)]{
          override def getCommandDescription: RedisCommandDescription = {
            new RedisCommandDescription(RedisCommand.HSET"channel_count")
           // new RedisCommandDescription(RedisCommand.SET  )
          }

          override def getValueFromData(t: (String, String)): String = t._2

          override def getKeyFromData(t: (String, String)): String = t._1
        }

      }

       

       

      在主函數(shù)中調(diào)用

      sumDstream.map( chCount=>(chCount._1,chCount._2+"" )).addSink(MyRedisUtil.getRedisSink())

       

       

      4.3 Elasticsearch  

      pom.xml

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
          <version>1.7.0</version>
      </dependency>

      <dependency>
          <groupId>org.apache.httpcomponents</groupId>
          <artifactId>httpclient</artifactId>
          <version>4.5.3</version>
      </dependency>

       

       

       

       

      添加MyEsUtil

      import java.util

      import com.alibaba.fastjson.{JSON, JSONObject}
      import org.apache.flink.api.common.functions.RuntimeContext
      import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
      import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
      import org.apache.http.HttpHost
      import org.elasticsearch.action.index.IndexRequest
      import org.elasticsearch.client.Requests

      object MyEsUtil {

        
        val httpHosts new util.ArrayList[HttpHost]
        httpHosts.add(new HttpHost("hadoop1",9200,"http"))
         httpHosts.add(new HttpHost("hadoop2",9200,"http"))
         httpHosts.add(new HttpHost("hadoop3",9200,"http"))


        def  getElasticSearchSink(indexName:String):  ElasticsearchSink[String]  ={
          val esFunc = new ElasticsearchSinkFunction[String] {
            override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
              println("試圖保存:"+element)
              val jsonObj: JSONObject = JSON.parseObject(element)
              val indexRequest: IndexRequest = Requests.indexRequest().index(indexName).`type`("_doc").source(jsonObj)
              indexer.add(indexRequest)
              println("保存1條")
            }
          }

          val sinkBuilder = new ElasticsearchSink.Builder[String](httpHosts, esFunc)

          //刷新前緩沖的最大動(dòng)作量
          sinkBuilder.setBulkFlushMaxActions(10)
       

           sinkBuilder.build()
        }

      }

       

       

       

      在main方法中調(diào)用

      // 明細(xì)發(fā)送到es 中
       val esSink: ElasticsearchSink[String] = MyEsUtil.getElasticSearchSink("gmall0503_startup")


        dstream.addSink(esSink)

       

       

       

       

      4.4 JDBC 自定義sink

      <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
      <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.44</version>
      </dependency>

      <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>druid</artifactId>
          <version>1.1.10</version>
      </dependency>

       

       

      添加MyJdbcSink

      class MyJdbcSink(sql:String ) extends  RichSinkFunction[Array[Any]] {

        val driver="com.mysql.jdbc.Driver"

        val url="jdbc:mysql://hadoop2:3306/gmall2019?useSSL=false"

        val username="root"

        val password="123123"

        val maxActive="20"

        var connection:Connection=null;

        //創(chuàng)建連接
        override def open(parameters: Configuration): Unit = {
          val properties = new Properties()
          properties.put("driverClassName",driver)
          properties.put("url",url)
          properties.put("username",username)
          properties.put("password",password)
          properties.put("maxActive",maxActive)


          val dataSource: DataSource = DruidDataSourceFactory.createDataSource(properties)
          connection = dataSource.getConnection()
        }

      //反復(fù)調(diào)用
        override def invoke(values: Array[Any]): Unit = {
          val ps: PreparedStatement = connection.prepareStatement(sql )
          println(values.mkString(","))
          for (i <- 0 until values.length) {
            ps.setObject(i + 1, values(i))
          }
          ps.executeUpdate()


        }

        override def close(): Unit = {

          if(connection!=null){
            connection.close()
          }

        }

      }

       

       

       

      在main方法中增加

         把明細(xì)保存到mysql中

                val startUplogDstream: DataStream[StartUpLog] = dstream.map{ JSON.parseObject(_,classOf[StartUpLog])}

      val jdbcSink = new MyJdbcSink("insert into z_startup values(?,?,?,?,?)")
      startUplogDstream.map(startuplog=>Array(startuplog.mid,startuplog.uid,startuplog.ch,startuplog.area,  startuplog.ts)).addSink(jdbcSink)

       

       

       

       

       

      第六章 Time與Window

      1 Time

      在Flink的流式處理中,會(huì)涉及到時(shí)間的不同概念,如下圖所示:

      圖 Flink時(shí)間概念

      Event Time:是事件創(chuàng)建的時(shí)間。它通常由事件中的時(shí)間戳描述,例如采集的日志數(shù)據(jù)中,每一條日志都會(huì)記錄自己的生成時(shí)間,F(xiàn)link通過(guò)時(shí)間戳分配器訪問(wèn)事件時(shí)間戳。

      Ingestion Time:是數(shù)據(jù)進(jìn)入Flink的時(shí)間。

      Processing Time:是每一個(gè)執(zhí)行基于時(shí)間操作的算子的本地系統(tǒng)時(shí)間,與機(jī)器相關(guān),默認(rèn)的時(shí)間屬性就是Processing Time。

      例如,一條日志進(jìn)入Flink的時(shí)間為2017-11-12 10:00:00.123,到達(dá)Window的系統(tǒng)時(shí)間為2017-11-12 10:00:01.234,日志的內(nèi)容如下:

      2017-11-02 18:37:15.624 INFO Fail over to rm2

      對(duì)于業(yè)務(wù)來(lái)說(shuō),要統(tǒng)計(jì)1min內(nèi)的故障日志個(gè)數(shù),哪個(gè)時(shí)間是最有意義的?—— eventTime,因?yàn)槲覀円鶕?jù)日志的生成時(shí)間進(jìn)行統(tǒng)計(jì)。

      2 Window

      2.1 Window概述

      streaming流式計(jì)算是一種被設(shè)計(jì)用于處理無(wú)限數(shù)據(jù)集的數(shù)據(jù)處理引擎,而無(wú)限數(shù)據(jù)集是指一種不斷增長(zhǎng)的本質(zhì)上無(wú)限的數(shù)據(jù)集,而window是一種切割無(wú)限數(shù)據(jù)為有限塊進(jìn)行處理的手段。

      Window是無(wú)限數(shù)據(jù)流處理的核心,Window將一個(gè)無(wú)限的stream拆分成有限大小的“buckets”桶,我們可以在這些桶上做計(jì)算操作。

      2.2 Window類(lèi)型

      Window可以分成兩類(lèi):

      l CountWindow:按照指定的數(shù)據(jù)條數(shù)生成一個(gè)Window,與時(shí)間無(wú)關(guān)。

      l TimeWindow:按照時(shí)間生成Window。

      對(duì)于TimeWindow,可以根據(jù)窗口實(shí)現(xiàn)原理的不同分成三類(lèi):滾動(dòng)窗口(Tumbling Window)、滑動(dòng)窗口(Sliding Window)和會(huì)話(huà)窗口(Session Window)。

      1. 滾動(dòng)窗口(Tumbling Windows)

      將數(shù)據(jù)依據(jù)固定的窗口長(zhǎng)度對(duì)數(shù)據(jù)進(jìn)行切片。

      特點(diǎn)時(shí)間對(duì)齊,窗口長(zhǎng)度固定,沒(méi)有重疊

      滾動(dòng)窗口分配器將每個(gè)元素分配到一個(gè)指定窗口大小的窗口中,滾動(dòng)窗口有一個(gè)固定的大小,并且不會(huì)出現(xiàn)重疊。例如:如果你指定了一個(gè)5分鐘大小的滾動(dòng)窗口,窗口的創(chuàng)建如下圖所示:

      圖 滾動(dòng)窗口

      適用場(chǎng)景:適合做BI統(tǒng)計(jì)等(做每個(gè)時(shí)間段的聚合計(jì)算)。

      2. 滑動(dòng)窗口(Sliding Windows)

      滑動(dòng)窗口是固定窗口的更廣義的一種形式,滑動(dòng)窗口由固定的窗口長(zhǎng)度和滑動(dòng)間隔組成

      特點(diǎn)時(shí)間對(duì)齊,窗口長(zhǎng)度固定,有重疊。

      滑動(dòng)窗口分配器將元素分配到固定長(zhǎng)度的窗口中,與滾動(dòng)窗口類(lèi)似,窗口的大小由窗口大小參數(shù)來(lái)配置,另一個(gè)窗口滑動(dòng)參數(shù)控制滑動(dòng)窗口開(kāi)始的頻率。因此,滑動(dòng)窗口如果滑動(dòng)參數(shù)小于窗口大小的話(huà),窗口是可以重疊的,在這種情況下元素會(huì)被分配到多個(gè)窗口中。

      例如,你有10分鐘的窗口和5分鐘的滑動(dòng),那么每個(gè)窗口中5分鐘的窗口里包含著上個(gè)10分鐘產(chǎn)生的數(shù)據(jù),如下圖所示:

      圖 滑動(dòng)窗口

      適用場(chǎng)景:對(duì)最近一個(gè)時(shí)間段內(nèi)的統(tǒng)計(jì)(求某接口最近5min的失敗率來(lái)決定是否要報(bào)警)。

      3. 會(huì)話(huà)窗口(Session Windows)

      由一系列事件組合一個(gè)指定時(shí)間長(zhǎng)度的timeout間隙組成,類(lèi)似于web應(yīng)用的session,也就是一段時(shí)間沒(méi)有接收到新數(shù)據(jù)就會(huì)生成新的窗口。

      特點(diǎn)時(shí)間無(wú)對(duì)齊。

      session窗口分配器通過(guò)session活動(dòng)來(lái)對(duì)元素進(jìn)行分組,session窗口跟滾動(dòng)窗口和滑動(dòng)窗口相比,不會(huì)有重疊和固定的開(kāi)始時(shí)間和結(jié)束時(shí)間的情況,相反,當(dāng)它在一個(gè)固定的時(shí)間周期內(nèi)不再收到元素,即非活動(dòng)間隔產(chǎn)生,那個(gè)這個(gè)窗口就會(huì)關(guān)閉。一個(gè)session窗口通過(guò)一個(gè)session間隔來(lái)配置,這個(gè)session間隔定義了非活躍周期的長(zhǎng)度,當(dāng)這個(gè)非活躍周期產(chǎn)生,那么當(dāng)前的session將關(guān)閉并且后續(xù)的元素將被分配到新的session窗口中去。

      圖 會(huì)話(huà)窗口

       

      3 Window API

      TimeWindow

      TimeWindow是將指定時(shí)間范圍內(nèi)的所有數(shù)據(jù)組成一個(gè)window,一次對(duì)一個(gè)window里面的所有數(shù)據(jù)進(jìn)行計(jì)算。

      1. 滾動(dòng)窗口

      Flink默認(rèn)的時(shí)間窗口根據(jù)Processing Time 進(jìn)行窗口的劃分,將Flink獲取到的數(shù)據(jù)根據(jù)進(jìn)入Flink的時(shí)間劃分到不同的窗口中。

       

      val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
      //每10統(tǒng)計(jì)一次各個(gè)渠道的計(jì)數(shù)
      val windowedStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyedStream.timeWindow(Time.seconds(10))
      val sumDstream: DataStream[(String, Int)] = windowedStream.sum(1)

       

       

       

      時(shí)間間隔可以通過(guò)Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個(gè)來(lái)指定。

      2. 滑動(dòng)窗口(SlidingEventTimeWindows)

      滑動(dòng)窗口和滾動(dòng)窗口的函數(shù)名是完全一致的,只是在傳參數(shù)時(shí)需要傳入兩個(gè)參數(shù),一個(gè)是window_size,一個(gè)是sliding_size。

      下面代碼中的sliding_size設(shè)置為了2s,也就是說(shuō),窗口每2s就計(jì)算一次,每一次計(jì)算的window范圍是5s內(nèi)的所有元素。

      val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
      //每5秒統(tǒng)計(jì)一次最近10秒的各個(gè)渠道的計(jì)數(shù)
      val windowedStream: WindowedStream[(String, Int), Tuple, TimeWindow] = keyedStream.timeWindow(Time.seconds(10),Time.seconds(5))
      val sumDstream: DataStream[(String, Int)] = windowedStream.sum(1)

       

       

       

      時(shí)間間隔可以通過(guò)Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個(gè)來(lái)指定。

       

       

      CountWindow

      CountWindow根據(jù)窗口中相同key元素的數(shù)量來(lái)觸發(fā)執(zhí)行,執(zhí)行時(shí)只計(jì)算元素?cái)?shù)量達(dá)到窗口大小的key對(duì)應(yīng)的結(jié)果。

      注意:CountWindow的window_size指的是相同Key的元素的個(gè)數(shù),不是輸入的所有元素的總數(shù)。

      1 滾動(dòng)窗口

      默認(rèn)的CountWindow是一個(gè)滾動(dòng)窗口,只需要指定窗口大小即可,當(dāng)元素?cái)?shù)量達(dá)到窗口大小時(shí),就會(huì)觸發(fā)窗口的執(zhí)行。

       

      val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
       //每當(dāng)某一個(gè)key的個(gè)數(shù)達(dá)到10的時(shí)候,顯示出來(lái)
       val windowedStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyedStream.countWindow(10)
       val sumDstream: DataStream[(String, Int)] = windowedStream.sum(1)

       

       

      2 滑動(dòng)窗口

      滑動(dòng)窗口和滾動(dòng)窗口的函數(shù)名是完全一致的,只是在傳參數(shù)時(shí)需要傳入兩個(gè)參數(shù),一個(gè)是window_size,一個(gè)是sliding_size。

      下面代碼中的sliding_size設(shè)置為了2,也就是說(shuō),每收到兩個(gè)相同key的數(shù)據(jù)就計(jì)算一次,每一次計(jì)算的window范圍是5個(gè)元素。

       

       

      val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
      //每當(dāng)某一個(gè)key的個(gè)數(shù)達(dá)到2的時(shí)候,觸發(fā)計(jì)算,計(jì)算最近該key最近10個(gè)元素的內(nèi)容
      val windowedStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyedStream.countWindow(10,2)
      val sumDstream: DataStream[(String, Int)] = windowedStream.sum(1)

       

       

       

      第七章 EventTime與Window

      1 EventTime的引入

      在Flink的流式處理中,絕大部分的業(yè)務(wù)都會(huì)使用eventTime,一般只在eventTime無(wú)法使用時(shí),才會(huì)被迫使用ProcessingTime或者IngestionTime。

      如果要使用EventTime,那么需要引入EventTime的時(shí)間屬性,引入方式如下所示:

      val env = StreamExecutionEnvironment.getExecutionEnvironment

       

      // 從調(diào)用時(shí)刻開(kāi)始給env創(chuàng)建的每一個(gè)stream追加時(shí)間特征

      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

      2 Watermark

      2.1 基本概念

      我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個(gè)過(guò)程和時(shí)間的,雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來(lái)的,但是也不排除由于網(wǎng)絡(luò)、分布式等原因,導(dǎo)致亂序的產(chǎn)生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴(yán)格按照事件的Event Time順序排列的。

      圖 數(shù)據(jù)的亂序

      那么此時(shí)出現(xiàn)一個(gè)問(wèn)題,一旦出現(xiàn)亂序,如果只根據(jù)eventTime決定window的運(yùn)行,我們不能明確數(shù)據(jù)是否全部到位,但又不能無(wú)限期的等下去,此時(shí)必須要有個(gè)機(jī)制來(lái)保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了,這個(gè)特別的機(jī)制,就是Watermark。

      Watermark是一種衡量Event Time進(jìn)展的機(jī)制,它是數(shù)據(jù)本身的一個(gè)隱藏屬性,數(shù)據(jù)本身攜帶著對(duì)應(yīng)的Watermark。

      Watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用Watermark機(jī)制結(jié)合window來(lái)實(shí)現(xiàn)

      數(shù)據(jù)流中的Watermark用于表示timestamp小于Watermark的數(shù)據(jù),都已經(jīng)到達(dá)了,因此,window的執(zhí)行也是由Watermark觸發(fā)的

      Watermark可以理解成一個(gè)延遲觸發(fā)機(jī)制,我們可以設(shè)置Watermark的延時(shí)時(shí)長(zhǎng)t,每次系統(tǒng)會(huì)校驗(yàn)已經(jīng)到達(dá)的數(shù)據(jù)中最大的maxEventTime,然后認(rèn)定eventTime小于maxEventTime - t的所有數(shù)據(jù)都已經(jīng)到達(dá),如果有窗口的停止時(shí)間等于maxEventTime – t,那么這個(gè)窗口被觸發(fā)執(zhí)行

      有序流的Watermarker如下圖所示:(Watermark設(shè)置為0)

      圖 有序數(shù)據(jù)的Watermark

      亂序流的Watermarker如下圖所示:(Watermark設(shè)置為2)

      圖 無(wú)序數(shù)據(jù)的Watermark

      當(dāng)Flink接收到每一條數(shù)據(jù)時(shí),都會(huì)產(chǎn)生一條Watermark,這條Watermark就等于當(dāng)前所有到達(dá)數(shù)據(jù)中的maxEventTime - 延遲時(shí)長(zhǎng),也就是說(shuō),Watermark是由數(shù)據(jù)攜帶的,一旦數(shù)據(jù)攜帶的Watermark比當(dāng)前未觸發(fā)的窗口的停止時(shí)間要晚,那么就會(huì)觸發(fā)相應(yīng)窗口的執(zhí)行。由于Watermark是由數(shù)據(jù)攜帶的,因此,如果運(yùn)行過(guò)程中無(wú)法獲取新的數(shù)據(jù),那么沒(méi)有被觸發(fā)的窗口將永遠(yuǎn)都不被觸發(fā)。

      上圖中,我們?cè)O(shè)置的允許最大延遲到達(dá)時(shí)間為2s,所以時(shí)間戳為7s的事件對(duì)應(yīng)的Watermark是5s,時(shí)間戳為12s的事件的Watermark是10s,如果我們的窗口1是1s~5s,窗口2是6s~10s,那么時(shí)間戳為7s的事件到達(dá)時(shí)的Watermarker恰好觸發(fā)窗口1,時(shí)間戳為12s的事件到達(dá)時(shí)的Watermark恰好觸發(fā)窗口2。

       

      Watermark 就是觸發(fā)前一窗口的“關(guān)窗時(shí)間”,一旦觸發(fā)關(guān)門(mén)那么以當(dāng)前時(shí)刻為準(zhǔn)在窗口范圍內(nèi)的所有所有數(shù)據(jù)都會(huì)收入窗中。

      只要沒(méi)有達(dá)到水位那么不管現(xiàn)實(shí)中的時(shí)間推進(jìn)了多久都不會(huì)觸發(fā)關(guān)窗。

      2.2 Watermark的引入

       

      val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) {
        override def extractTimestamp(element: (String, Long, Int)): Long = {

         return  element._2
        }
      })

       

       

      7.3 EvnetTimeWindow API

      3.1 滾動(dòng)窗口(TumblingEventTimeWindows 

      def main(args: Array[String]): Unit = {
          //  環(huán)境
          val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
          env.setParallelism(1)

          val dstream: DataStream[String] = env.socketTextStream("hadoop1",7777)



          val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text =>
            val arr: Array[String] = text.split(" ")
            (arr(0), arr(1).toLong, 1)
          }
          val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) {
            override def extractTimestamp(element: (String, Long, Int)): Long = {

             return  element._2
            }
          })

          val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0)
          textKeyStream.print("textkey:")

          val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(TumblingEventTimeWindows.of(Time.seconds(2)))

          val groupDstream: DataStream[mutable.HashSet[Long]] = windowStream.fold(new mutable.HashSet[Long]()) { case (set, (key, ts, count)) =>
            set += ts
          }

          groupDstream.print("window::::").setParallelism(1)

          env.execute()

        }

      }

       

       

       

      結(jié)果是按照Event Time的時(shí)間窗口計(jì)算得出的,而無(wú)關(guān)系統(tǒng)的時(shí)間(包括輸入的快慢)。

      3.2 滑動(dòng)窗口(SlidingEventTimeWindows

        

      def main(args: Array[String]): Unit = {
        //  環(huán)境
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)

        val dstream: DataStream[String] = env.socketTextStream("hadoop1",7777)



        val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text =>
          val arr: Array[String] = text.split(" ")
          (arr(0), arr(1).toLong, 1)
        }
        val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) {
          override def extractTimestamp(element: (String, Long, Int)): Long = {

           return  element._2
          }
        })

        val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0)
        textKeyStream.print("textkey:")

        val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(SlidingEventTimeWindows.of(Time.seconds(2),Time.milliseconds(500)))

        val groupDstream: DataStream[mutable.HashSet[Long]] = windowStream.fold(new mutable.HashSet[Long]()) { case (set, (key, ts, count)) =>
          set += ts
        }

        groupDstream.print("window::::").setParallelism(1)

        env.execute()

      }

       

       

       

      3.3 會(huì)話(huà)窗口(EventTimeSessionWindows

      相鄰兩次數(shù)據(jù)的EventTime的時(shí)間差超過(guò)指定的時(shí)間間隔就會(huì)觸發(fā)執(zhí)行。如果加入Watermark, 會(huì)在符合窗口觸發(fā)的情況下進(jìn)行延遲。到達(dá)延遲水位再進(jìn)行窗口觸發(fā)。

      def main(args: Array[String]): Unit = {
          //  環(huán)境
          val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

          env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
          env.setParallelism(1)

          val dstream: DataStream[String] = env.socketTextStream("hadoop1",7777)



          val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text =>
            val arr: Array[String] = text.split(" ")
            (arr(0), arr(1).toLong, 1)
          }
          val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) {
            override def extractTimestamp(element: (String, Long, Int)): Long = {

             return  element._2
            }
          })

          val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0)
          textKeyStream.print("textkey:")

          val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(EventTimeSessionWindows.withGap(Time.milliseconds(500)) )

       

          windowStream.reduce((text1,text2)=>
            (  text1._1,0L,text1._3+text2._3)
          )  .map(_._3).print("windows:::").setParallelism(1)

          env.execute()

        }

       

       

       

       

       

      第八章 Table API SQL

       

      Table API是流處理和批處理通用的關(guān)系型API,Table API可以基于流輸入或者批輸入來(lái)運(yùn)行而不需要進(jìn)行任何修改。Table API是SQL語(yǔ)言的超集并專(zhuān)門(mén)為Apache Flink設(shè)計(jì)的,Table API是Scala 和Java語(yǔ)言集成式的API。與常規(guī)SQL語(yǔ)言中將查詢(xún)指定為字符串不同,Table API查詢(xún)是以Java或Scala中的語(yǔ)言嵌入樣式來(lái)定義的,具有IDE支持如:自動(dòng)完成和語(yǔ)法檢測(cè)。

       

       

      1 需要引入的pom依賴(lài)

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table_2.11</artifactId>
          <version>1.7.0</version>
      </dependency>

       

       

       

      2 構(gòu)造表環(huán)境

       

      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP")
        val dstream: DataStream[String] = env.addSource(myKafkaConsumer)

        val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)

        val startupLogDstream: DataStream[StartupLog] = dstream.map{ jsonString =>JSON.parseObject(jsonString,classOf[StartupLog]) }

        val startupLogTable: Table = tableEnv.fromDataStream(startupLogDstream)

         val table: Table = startupLogTable.select("mid,ch").filter("ch ='appstore'")

        val midchDataStream: DataStream[(String, String)] = table.toAppendStream[(String,String)]

        midchDataStream.print()
        env.execute()
      }

       

       

      動(dòng)態(tài)表

      如果流中的數(shù)據(jù)類(lèi)型是case class可以直接根據(jù)case class的結(jié)構(gòu)生成table

      tableEnv.fromDataStream(startupLogDstream)  

      或者根據(jù)字段順序單獨(dú)命名

      tableEnv.fromDataStream(startupLogDstream,’mid,’uid  .......)  

       

      最后的動(dòng)態(tài)表可以轉(zhuǎn)換為流進(jìn)行輸出

      table.toAppendStream[(String,String)]

       

      字段

       用一個(gè)單引放到字段前面 來(lái)標(biāo)識(shí)字段名, 如 ‘name , ‘mid ,’amount 等

       

       

      3 通過(guò)一個(gè)例子 了解TableAPI 

      //每10秒中渠道為appstore的個(gè)數(shù)
      def main(args: Array[String]): Unit = {
        //sparkcontext
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        //時(shí)間特性改為eventTime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

        val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP")
        val dstream: DataStream[String] = env.addSource(myKafkaConsumer)

        val startupLogDstream: DataStream[StartupLog] = dstream.map{ jsonString =>JSON.parseObject(jsonString,classOf[StartupLog]) }
        //告知watermark 和 eventTime如何提取
        val startupLogWithEventTimeDStream: DataStream[StartupLog] = startupLogDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[StartupLog](Time.seconds(0L)) {
          override def extractTimestamp(element: StartupLog): Long = {
            element.ts
          }
        }).setParallelism(1)

        //SparkSession
        val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)

        //把數(shù)據(jù)流轉(zhuǎn)化成Table
        val startupTable: Table = tableEnv.fromDataStream(startupLogWithEventTimeDStream , 'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime)

        //通過(guò)table api 進(jìn)行操作
        // 每10秒 統(tǒng)計(jì)一次各個(gè)渠道的個(gè)數(shù) table api 解決
        //1 groupby  2 要用 window   3 用eventtime來(lái)確定開(kāi)窗時(shí)間
        val resultTable: Table = startupTable.window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch,'tt ).select( 'ch'ch.count)
       
       

        //把Table轉(zhuǎn)化成數(shù)據(jù)流
        //val appstoreDStream: DataStream[(String, String, Long)] = appstoreTable.toAppendStream[(String,String,Long)]
        val resultDstream: DataStream[(Boolean, (String, Long))] = resultSQLTable.toRetractStream[(String,Long)]

        resultDstream.filter(_._1).print()

        env.execute()

      }

       

       

       

       

       

      關(guān)于group by

      1、 如果使用 groupby table轉(zhuǎn)換為流的時(shí)候只能用toRetractDstream

        val rDstream: DataStream[(Boolean, (String, Long))] = table.toRetractStream[(String,Long)]

       

      2、 toRetractDstream 得到的第一個(gè)boolean型字段標(biāo)識(shí) true就是最新的數(shù)據(jù),false表示過(guò)期老數(shù)據(jù)

       

        val rDstream: DataStream[(Boolean, (String, Long))] = table.toRetractStream[(String,Long)]

        rDstream.filter(_._1).print()

       

      3、 如果使用的api包括時(shí)間窗口,那么時(shí)間的字段必須,包含在group by中。

        val table: Table = startupLogTable.filter("ch ='appstore'").window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch ,'tt).select("ch,ch.count ")

       

      關(guān)于時(shí)間窗口

      1 用到時(shí)間窗口,必須提前聲明時(shí)間字段,如果是processTime直接在創(chuàng)建動(dòng)態(tài)表時(shí)進(jìn)行追加就可以

       

      val startupLogTable: Table = tableEnv.fromDataStream(startupLogWithEtDstream,'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime)

       

      2 如果是EventTime要在創(chuàng)建動(dòng)態(tài)表時(shí)聲明

      val startupLogTable: Table = tableEnv.fromDataStream(startupLogWithEtDstream,'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ps.processtime)

       

       

      3 滾動(dòng)窗口可以使用Tumble over 10000.millis on

        val table: Table = startupLogTable.filter("ch ='appstore'").window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch ,'tt).select("ch,ch.count ")

       

      SQL如何編寫(xiě)

       

      def main(args: Array[String]): Unit = {
        //sparkcontext
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        //時(shí)間特性改為eventTime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

        val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP")
        val dstream: DataStream[String] = env.addSource(myKafkaConsumer)

        val startupLogDstream: DataStream[StartupLog] = dstream.map{ jsonString =>JSON.parseObject(jsonString,classOf[StartupLog]) }
        //告知watermark 和 eventTime如何提取
        val startupLogWithEventTimeDStream: DataStream[StartupLog] = startupLogDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[StartupLog](Time.seconds(0L)) {
          override def extractTimestamp(element: StartupLog): Long = {
            element.ts
          }
        }).setParallelism(1)

        //SparkSession
        val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env)

        //把數(shù)據(jù)流轉(zhuǎn)化成Table
        val startupTable: Table = tableEnv.fromDataStream(startupLogWithEventTimeDStream , 'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime)

        //通過(guò)table api 進(jìn)行操作
        // 每10秒 統(tǒng)計(jì)一次各個(gè)渠道的個(gè)數(shù) table api 解決
        //1 groupby  2 要用 window   3 用eventtime來(lái)確定開(kāi)窗時(shí)間
        val resultTable: Table = startupTable.window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch,'tt ).select( 'ch'ch.count)
       // 通過(guò)sql 進(jìn)行操作

        val resultSQLTable : Table = tableEnv.sqlQuery( "select ch ,count(ch)   from "+startupTable+"  group by ch   ,Tumble(ts,interval '10' SECOND )")

        //把Table轉(zhuǎn)化成數(shù)據(jù)流
        //val appstoreDStream: DataStream[(String, String, Long)] = appstoreTable.toAppendStream[(String,String,Long)]
        val resultDstream: DataStream[(Boolean, (String, Long))] = resultSQLTable.toRetractStream[(String,Long)]

        resultDstream.filter(_._1).print()

        env.execute()

      }

       

      posted @ 2019-08-09 14:31  cerofang  閱讀(2348)  評(píng)論(0)    收藏  舉報(bào)
      主站蜘蛛池模板: 久久精品| 久久久久国产精品熟女影院| 真实国产老熟女无套内射| 视频免费完整版在线播放| 国产一码二码三码区别| 国产日韩一区二区四季| 国产精品国产三级国产an| 二区三区亚洲精品国产| 欧美成人片在线观看| аⅴ天堂中文在线网| 九九热在线视频精品免费| 高级艳妇交换俱乐部小说| 青青草成人免费自拍视频| 在线午夜精品自拍小视频| 人妻va精品va欧美va| 国精品午夜福利视频不卡| 亚洲成人av综合一区| 久久99久国产精品66| 国产欧亚州美日韩综合区| 小嫩模无套内谢第一次| 亚洲a∨无码一区二区三区| 无码天堂亚洲国产av麻豆| 久久蜜臀av一区三区| 欧美激情精品久久| 国产偷国产偷亚洲高清人| 伊人色综合一区二区三区影院视频| 无码av中文一区二区三区桃花岛| 国产主播精品福利午夜二区| 最新偷拍一区二区三区| 亚洲日韩av无码中文字幕美国 | 免费观看欧美猛交视频黑人| 成人免费看片又大又黄| 久久成人伊人欧洲精品| 免费 黄 色 人成 视频 在 线| 少妇爆乳无码专区| 久久蜜臀av一区三区| 丰满少妇内射一区| 国产精品久久久久av福利动漫| 日本美女性亚洲精品黄色| 亚洲人妻中文字幕一区| 国产超级va在线观看视频|