大數(shù)據(jù)課程之Flink
大數(shù)據(jù)課程之Flink
第一章 Flink簡(jiǎn)介
1、初識(shí)Flink
Apache Flink是一個(gè)框架和分布式處理引擎,用于對(duì)無(wú)界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計(jì)算。Flink被設(shè)計(jì)在所有常見(jiàn)的集群環(huán)境中運(yùn)行,以?xún)?nèi)存執(zhí)行速度和任意規(guī)模來(lái)執(zhí)行計(jì)算。
Flink起源于Stratosphere項(xiàng)目,Stratosphere是在2010~2014年由3所地處柏林的大學(xué)和歐洲的一些其他的大學(xué)共同進(jìn)行的研究項(xiàng)目,2014年4月Stratosphere的代碼被復(fù)制并捐贈(zèng)給了Apache軟件基金會(huì),參加這個(gè)孵化項(xiàng)目的初始成員是Stratosphere系統(tǒng)的核心開(kāi)發(fā)人員,2014年12月,F(xiàn)link一躍成為Apache軟件基金會(huì)的頂級(jí)項(xiàng)目。
在德語(yǔ)中,F(xiàn)link一詞表示快速和靈巧,項(xiàng)目采用一只松鼠的彩色圖案作為logo,這不僅是因?yàn)樗墒缶哂锌焖俸挽`巧的特點(diǎn),還因?yàn)榘亓值乃墒笥幸环N迷人的紅棕色,而Flink的松鼠logo擁有可愛(ài)的尾巴,尾巴的顏色與Apache軟件基金會(huì)的logo顏色相呼應(yīng),也就是說(shuō),這是一只Apache風(fēng)格的松鼠。

圖 Flink Logo
Flink雖然誕生的早(2010年),但是其實(shí)是起大早趕晚集,直到2015年才開(kāi)始突然爆發(fā)熱度。
在Flink被apache提升為頂級(jí)項(xiàng)目之后,阿里實(shí)時(shí)計(jì)算團(tuán)隊(duì)決定在阿里內(nèi)部建立一個(gè) Flink 分支 Blink,并對(duì) Flink 進(jìn)行大量的修改和完善,讓其適應(yīng)阿里巴巴這種超大規(guī)模的業(yè)務(wù)場(chǎng)景。
Blink由2016年上線,服務(wù)于阿里集團(tuán)內(nèi)部搜索、推薦、廣告和螞蟻等大量核心實(shí)時(shí)業(yè)務(wù)。與2019年1月Blink正式開(kāi)源,目前阿里70%的技術(shù)部門(mén)都有使用該版本。
Blink比起Flink的優(yōu)勢(shì)就是對(duì)SQL語(yǔ)法的更完善的支持以及執(zhí)行SQL的性能提升。
2 Flink的重要特點(diǎn)
2.1 事件驅(qū)動(dòng)型(Event-driven)
事件驅(qū)動(dòng)型應(yīng)用是一類(lèi)具有狀態(tài)的應(yīng)用,它從一個(gè)或多個(gè)事件流提取數(shù)據(jù),并根據(jù)到來(lái)的事件觸發(fā)計(jì)算、狀態(tài)更新或其他外部動(dòng)作。比較典型的就是以kafka為代表的消息隊(duì)列幾乎都是事件驅(qū)動(dòng)型應(yīng)用。
與之不同的就是SparkStreaming微批次,如圖:
事件驅(qū)動(dòng)型:
2.2 流與批的世界觀
批處理的特點(diǎn)是有界、持久、大量,非常適合需要訪問(wèn)全套記錄才能完成的計(jì)算工作,一般用于離線統(tǒng)計(jì)。
流處理的特點(diǎn)是無(wú)界、實(shí)時(shí), 無(wú)需針對(duì)整個(gè)數(shù)據(jù)集執(zhí)行操作,而是對(duì)通過(guò)系統(tǒng)傳輸?shù)拿總€(gè)數(shù)據(jù)項(xiàng)執(zhí)行操作,一般用于實(shí)時(shí)統(tǒng)計(jì)。
在spark的世界觀中,一切都是由批次組成的,離線數(shù)據(jù)是一個(gè)大批次,而實(shí)時(shí)數(shù)據(jù)是由一個(gè)一個(gè)無(wú)限的小批次組成的。
而在flink的世界觀中,一切都是由流組成的,離線數(shù)據(jù)是有界限的流,實(shí)時(shí)數(shù)據(jù)是一個(gè)沒(méi)有界限的流,這就是所謂的有界流和無(wú)界流。
無(wú)界數(shù)據(jù)流:無(wú)界數(shù)據(jù)流有一個(gè)開(kāi)始但是沒(méi)有結(jié)束,它們不會(huì)在生成時(shí)終止并提供數(shù)據(jù),必須連續(xù)處理無(wú)界流,也就是說(shuō)必須在獲取后立即處理event。對(duì)于無(wú)界數(shù)據(jù)流我們無(wú)法等待所有數(shù)據(jù)都到達(dá),因?yàn)檩斎胧菬o(wú)界的,并且在任何時(shí)間點(diǎn)都不會(huì)完成。處理無(wú)界數(shù)據(jù)通常要求以特定順序(例如事件發(fā)生的順序)獲取event,以便能夠推斷結(jié)果完整性。
有界數(shù)據(jù)流:有界數(shù)據(jù)流有明確定義的開(kāi)始和結(jié)束,可以在執(zhí)行任何計(jì)算之前通過(guò)獲取所有數(shù)據(jù)來(lái)處理有界流,處理有界流不需要有序獲取,因?yàn)榭梢允冀K對(duì)有界數(shù)據(jù)集進(jìn)行排序,有界流的處理也稱(chēng)為批處理。

這種以流為世界觀的架構(gòu),獲得的最大好處就是具有極低的延遲。
2.3 分層api
最底層級(jí)的抽象僅僅提供了有狀態(tài)流,它將通過(guò)過(guò)程函數(shù)(Process Function)被嵌入到DataStream API中。底層過(guò)程函數(shù)(Process Function) 與 DataStream API 相集成,使其可以對(duì)某些特定的操作進(jìn)行底層的抽象,它允許用戶(hù)可以自由地處理來(lái)自一個(gè)或多個(gè)數(shù)據(jù)流的事件,并使用一致的容錯(cuò)的狀態(tài)。除此之外,用戶(hù)可以注冊(cè)事件時(shí)間并處理時(shí)間回調(diào),從而使程序可以處理復(fù)雜的計(jì)算。
實(shí)際上,大多數(shù)應(yīng)用并不需要上述的底層抽象,而是針對(duì)核心API(Core APIs) 進(jìn)行編程,比如DataStream API(有界或無(wú)界流數(shù)據(jù))以及DataSet API(有界數(shù)據(jù)集)。這些API為數(shù)據(jù)處理提供了通用的構(gòu)建模塊,比如由用戶(hù)定義的多種形式的轉(zhuǎn)換(transformations),連接(joins),聚合(aggregations),窗口操作(windows)等等。DataSet API 為有界數(shù)據(jù)集提供了額外的支持,例如循環(huán)與迭代。這些API處理的數(shù)據(jù)類(lèi)型以類(lèi)(classes)的形式由各自的編程語(yǔ)言所表示。
Table API 是以表為中心的聲明式編程,其中表可能會(huì)動(dòng)態(tài)變化(在表達(dá)流數(shù)據(jù)時(shí))。Table API遵循(擴(kuò)展的)關(guān)系模型:表有二維數(shù)據(jù)結(jié)構(gòu)(schema)(類(lèi)似于關(guān)系數(shù)據(jù)庫(kù)中的表),同時(shí)API提供可比較的操作,例如select、project、join、group-by、aggregate等。Table API程序聲明式地定義了什么邏輯操作應(yīng)該執(zhí)行,而不是準(zhǔn)確地確定這些操作代碼的看上去如何 。 盡管Table API可以通過(guò)多種類(lèi)型的用戶(hù)自定義函數(shù)(UDF)進(jìn)行擴(kuò)展,其仍不如核心API更具表達(dá)能力,但是使用起來(lái)卻更加簡(jiǎn)潔(代碼量更少)。除此之外,Table API程序在執(zhí)行之前會(huì)經(jīng)過(guò)內(nèi)置優(yōu)化器進(jìn)行優(yōu)化。
你可以在表與 DataStream/DataSet 之間無(wú)縫切換,以允許程序?qū)?nbsp;Table API 與 DataStream 以及 DataSet 混合使用。
Flink提供的最高層級(jí)的抽象是 SQL 。這一層抽象在語(yǔ)法與表達(dá)能力上與 Table API 類(lèi)似,但是是以SQL查詢(xún)表達(dá)式的形式表現(xiàn)程序。SQL抽象與Table API交互密切,同時(shí)SQL查詢(xún)可以直接在Table API定義的表上執(zhí)行。
2.4 支持有狀態(tài)計(jì)算
Flink在1.4版本中實(shí)現(xiàn)了狀態(tài)管理,所謂狀態(tài)管理就是在流失計(jì)算過(guò)程中將算子的中間結(jié)果保存在內(nèi)存或者文件系統(tǒng)中,等下一個(gè)事件進(jìn)入算子后可以讓當(dāng)前事件的值與歷史值進(jìn)行匯總累計(jì)。
2.5 支持exactly-once語(yǔ)義
在分布式系統(tǒng)中,組成系統(tǒng)的各個(gè)計(jì)算機(jī)是獨(dú)立的。這些計(jì)算機(jī)有可能fail。
一個(gè)sender發(fā)送一條message到receiver。根據(jù)receiver出現(xiàn)fail時(shí)sender如何處理fail,可以將message delivery分為三種語(yǔ)義:
At Most once: 對(duì)于一條message,receiver最多收到一次(0次或1次).
可以達(dá)成At Most Once的策略:
sender把message發(fā)送給receiver.無(wú)論receiver是否收到message,sender都不再重發(fā)message.
At Least once: 對(duì)于一條message,receiver最少收到一次(1次及以上).
可以達(dá)成At Least Once的策略:
sender把message發(fā)送給receiver.當(dāng)receiver在規(guī)定時(shí)間內(nèi)沒(méi)有回復(fù)ACK或回復(fù)了error信息,那么sender重發(fā)這條message給receiver,直到sender收到receiver的ACK.
Exactly once: 對(duì)于一條message,receiver確保只收到一次
2.6 支持事件時(shí)間(EventTime)
目前大多數(shù)框架時(shí)間窗口計(jì)算,都是采用當(dāng)前系統(tǒng)時(shí)間,以時(shí)間為單位進(jìn)行的聚合計(jì)算只能反應(yīng)數(shù)據(jù)到達(dá)計(jì)算引擎的時(shí)間,而并不是實(shí)際業(yè)務(wù)時(shí)間
第二章 快速上手
1 搭建maven工程 flink-2019
1.1、pom文件
|
<?xml version="1.0" encoding="UTF-8"?>
<build>
|
1.2 添加scala框架 和 scala文件夾
2 批處理wordcount
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
object WordCountBeach {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val input = "F:\\input\\words.txt"
val ds: DataSet[String] = env.readTextFile(input)
import org.apache.flink.api.scala._
val aggDs = ds.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
aggDs.print()
}
}
|
def main(args: Array[String]): Unit = {
|
注意:Flink程序支持java 和 scala兩種語(yǔ)言,本課程中以scala語(yǔ)言為主。
在引入包中,有java和scala兩種包時(shí)注意要使用scala的包
3 流處理 wordcount
|
import org.apache.flink.api.java.utils.ParameterTool
|
測(cè)試
在linux系統(tǒng)中用
|
nc -lk 7777 |
進(jìn)行發(fā)送測(cè)試
第三章 Flink部署
1 standalone模式
1.1 安裝
解壓縮 flink-1.7.0-bin-hadoop27-scala_2.11.tgz
修改 flink/conf/flink-conf.yaml 文件
注意:yaml文件必須是冒號(hào)+空格
修改 /conf/slave文件
.分發(fā)給 另外兩臺(tái)機(jī)子
啟動(dòng)
訪問(wèn)http://hadoop1:8081
1.2 提交任務(wù)
1) 準(zhǔn)備數(shù)據(jù)文件
2) 把含數(shù)據(jù)文件的文件夾,分發(fā)到taskmanage 機(jī)器中
由于讀取數(shù)據(jù)是從本地磁盤(pán)讀取,實(shí)際任務(wù)會(huì)被分發(fā)到taskmanage的機(jī)器中,所以要把目標(biāo)文件分發(fā)。
3) 執(zhí)行程序
|
./flink run -c com.atguigu.flink.app.BatchWcApp /ext/flink0503-1.0-SNAPSHOT.jar --input /applog/flink/input.txt --output /applog/flink/output.csv |
4) 到目標(biāo)文件夾中查看計(jì)算結(jié)果
注意:計(jì)算結(jié)果根據(jù)會(huì)保存到taskmanage的機(jī)器下,不會(huì)再jobmanage下。
5) 在webui控制臺(tái)查看計(jì)算過(guò)程
2 yarn模式
1) 啟動(dòng)hadoop集群
2) 啟動(dòng)yarn-session
|
./yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d |
其中:
-n(--container):TaskManager的數(shù)量。
-s(--slots): 每個(gè)TaskManager的slot數(shù)量,默認(rèn)一個(gè)slot一個(gè)core,默認(rèn)每個(gè)taskmanager的slot的個(gè)數(shù)為1,有時(shí)可以多一些taskmanager,做冗余。
-jm:JobManager的內(nèi)存(單位MB)。
-tm:每個(gè)taskmanager的內(nèi)存(單位MB)。
-nm:yarn 的appName(現(xiàn)在yarn的ui上的名字)。
-d:后臺(tái)執(zhí)行。
3) 執(zhí)行任務(wù)
|
./flink run -m yarn-cluster -c com.atguigu.flink.app.BatchWcApp /ext/flink0503-1.0-SNAPSHOT.jar --input /applog/flink/input.txt --output /applog/flink/output5.csv |
4) 去yarn控制臺(tái)查看任務(wù)狀態(tài)
第四章 Flink運(yùn)行架構(gòu)
1 任務(wù)提交流程(yarn模式)
圖 Yarn模式任務(wù)提交流程
Flink任務(wù)提交后,Client向HDFS上傳Flink的Jar包和配置,之后向Yarn ResourceManager提交任務(wù),ResourceManager分配Container資源并通知對(duì)應(yīng)的NodeManager啟動(dòng)ApplicationMaster,ApplicationMaster啟動(dòng)后加載Flink的Jar包和配置構(gòu)建環(huán)境,然后啟動(dòng)JobManager,之后ApplicationMaster向ResourceManager申請(qǐng)資源啟動(dòng)TaskManager,ResourceManager分配Container資源后,由ApplicationMaster通知資源所在節(jié)點(diǎn)的NodeManager啟動(dòng)TaskManager,NodeManager加載Flink的Jar包和配置構(gòu)建環(huán)境并啟動(dòng)TaskManager,TaskManager啟動(dòng)后向JobManager發(fā)送心跳包,并等待JobManager向其分配任務(wù)。
2 任務(wù)調(diào)度原理

圖 任務(wù)調(diào)度原理
客戶(hù)端不是運(yùn)行時(shí)和程序執(zhí)行的一部分,但它用于準(zhǔn)備并發(fā)送dataflow(JobGraph)給Master(JobManager),然后,客戶(hù)端斷開(kāi)連接或者維持連接以等待接收計(jì)算結(jié)果。
當(dāng) Flink 集群?jiǎn)?dòng)后,首先會(huì)啟動(dòng)一個(gè) JobManger 和一個(gè)或多個(gè)的 TaskManager。由 Client 提交任務(wù)給 JobManager,JobManager 再調(diào)度任務(wù)到各個(gè) TaskManager 去執(zhí)行,然后 TaskManager 將心跳和統(tǒng)計(jì)信息匯報(bào)給 JobManager。TaskManager 之間以流的形式進(jìn)行數(shù)據(jù)的傳輸。上述三者均為獨(dú)立的 JVM 進(jìn)程。
Client 為提交 Job 的客戶(hù)端,可以是運(yùn)行在任何機(jī)器上(與 JobManager 環(huán)境連通即可)。提交 Job 后,Client 可以結(jié)束進(jìn)程(Streaming的任務(wù)),也可以不結(jié)束并等待結(jié)果返回。
JobManager 主要負(fù)責(zé)調(diào)度 Job 并協(xié)調(diào) Task 做 checkpoint,職責(zé)上很像 Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包等資源后,會(huì)生成優(yōu)化后的執(zhí)行計(jì)劃,并以 Task 的單元調(diào)度到各個(gè) TaskManager 去執(zhí)行。
TaskManager 在啟動(dòng)的時(shí)候就設(shè)置好了槽位數(shù)(Slot),每個(gè) slot 能啟動(dòng)一個(gè) Task,Task 為線程。從 JobManager 處接收需要部署的 Task,部署啟動(dòng)后,與自己的上游建立 Netty 連接,接收數(shù)據(jù)并處理。
關(guān)于執(zhí)行圖
Flink 中的執(zhí)行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執(zhí)行圖。
StreamGraph:是根據(jù)用戶(hù)通過(guò) Stream API 編寫(xiě)的代碼生成的最初的圖。用來(lái)表示程序的拓?fù)浣Y(jié)構(gòu)。
JobGraph:StreamGraph經(jīng)過(guò)優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個(gè)符合條件的節(jié)點(diǎn) chain 在一起作為一個(gè)節(jié)點(diǎn),這樣可以減少數(shù)據(jù)在節(jié)點(diǎn)之間流動(dòng)所需要的序列化/反序列化/傳輸消耗。
ExecutionGraph:JobManager 根據(jù) JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。
物理執(zhí)行圖:JobManager 根據(jù) ExecutionGraph 對(duì) Job 進(jìn)行調(diào)度后,在各個(gè)TaskManager 上部署 Task 后形成的“圖”,并不是一個(gè)具體的數(shù)據(jù)結(jié)構(gòu)。

3 Worker與Slots
每一個(gè)worker(TaskManager)是一個(gè)JVM進(jìn)程,它可能會(huì)在獨(dú)立的線程上執(zhí)行一個(gè)或多個(gè)subtask。為了控制一個(gè)worker能接收多少個(gè)task,worker通過(guò)task slot來(lái)進(jìn)行控制(一個(gè)worker至少有一個(gè)task slot)?!?/p>
每個(gè)task slot表示TaskManager擁有資源的一個(gè)固定大小的子集。假如一個(gè)TaskManager有三個(gè)slot,那么它會(huì)將其管理的內(nèi)存分成三份給各個(gè)slot。資源slot化意味著一個(gè)subtask將不需要跟來(lái)自其他job的subtask競(jìng)爭(zhēng)被管理的內(nèi)存,取而代之的是它將擁有一定數(shù)量的內(nèi)存儲(chǔ)備。需要注意的是,這里不會(huì)涉及到CPU的隔離,slot目前僅僅用來(lái)隔離task的受管理的內(nèi)存。
通過(guò)調(diào)整task slot的數(shù)量,允許用戶(hù)定義subtask之間如何互相隔離。如果一個(gè)TaskManager一個(gè)slot,那將意味著每個(gè)task group運(yùn)行在獨(dú)立的JVM中(該JVM可能是通過(guò)一個(gè)特定的容器啟動(dòng)的),而一個(gè)TaskManager多個(gè)slot意味著更多的subtask可以共享同一個(gè)JVM。而在同一個(gè)JVM進(jìn)程中的task將共享TCP連接(基于多路復(fù)用)和心跳消息。它們也可能共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu),因此這減少了每個(gè)task的負(fù)載。

圖 TaskManager與Slot
Task Slot是靜態(tài)的概念,是指TaskManager具有的并發(fā)執(zhí)行能力,可以通過(guò)參數(shù)taskmanager.numberOfTaskSlots進(jìn)行配置,而并行度parallelism是動(dòng)態(tài)概念,即TaskManager運(yùn)行程序時(shí)實(shí)際使用的并發(fā)能力,可以通過(guò)參數(shù)parallelism.default進(jìn)行配置。
也就是說(shuō),假設(shè)一共有3個(gè)TaskManager,每一個(gè)TaskManager中的分配3個(gè)TaskSlot,也就是每個(gè)TaskManager可以接收3個(gè)task,一共9個(gè)TaskSlot,如果我們?cè)O(shè)置parallelism.default=1,即運(yùn)行程序默認(rèn)的并行度為1,9個(gè)TaskSlot只用了1個(gè),有8個(gè)空閑,因此,設(shè)置合適的并行度才能提高效率。
4 并行數(shù)據(jù)流
Flink程序的執(zhí)行具有并行、分布式的特性。在執(zhí)行過(guò)程中,一個(gè) stream 包含一個(gè)或多個(gè) stream partition ,而每一個(gè) operator 包含一個(gè)或多個(gè) operator subtask,這些operator subtasks在不同的線程、不同的物理機(jī)或不同的容器中彼此互不依賴(lài)得執(zhí)行。
一個(gè)特定operator的subtask的個(gè)數(shù)被稱(chēng)之為其parallelism(并行度)。一個(gè)stream的并行度總是等同于其producing operator的并行度。一個(gè)程序中,不同的operator可能具有不同的并行度。

圖 并行數(shù)據(jù)流
Stream在operator之間傳輸數(shù)據(jù)的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具體是哪一種形式,取決于operator的種類(lèi)。
One-to-one:stream(比如在source和map operator之間)維護(hù)著分區(qū)以及元素的順序。那意味著map operator的subtask看到的元素的個(gè)數(shù)以及順序跟source operator的subtask生產(chǎn)的元素的個(gè)數(shù)、順序相同,map、fliter、flatMap等算子都是one-to-one的對(duì)應(yīng)關(guān)系。
? 類(lèi)似于spark中的窄依賴(lài)
Redistributing:stream(map()跟keyBy/window之間或者keyBy/window跟sink之間)的分區(qū)會(huì)發(fā)生改變。每一個(gè)operator subtask依據(jù)所選擇的transformation發(fā)送數(shù)據(jù)到不同的目標(biāo)subtask。例如,keyBy() 基于hashCode重分區(qū)、broadcast和rebalance會(huì)隨機(jī)重新分區(qū),這些算子都會(huì)引起redistribute過(guò)程,而redistribute過(guò)程就類(lèi)似于Spark中的shuffle過(guò)程。
? 類(lèi)似于spark中的寬依賴(lài)
5 task與operator chains
相同并行度的one to one操作,F(xiàn)link這樣相連的operator 鏈接在一起形成一個(gè)task,原來(lái)的operator成為里面的subtask。將operators鏈接成task是非常有效的優(yōu)化:它能減少線程之間的切換和基于緩存區(qū)的數(shù)據(jù)交換,在減少時(shí)延的同時(shí)提升吞吐量。鏈接的行為可以在編程API中進(jìn)行指定。

圖 task與operator chains
OperatorChain的優(yōu)點(diǎn)
? 減少線程切換
? 減少序列化與反序列化
? 減少延遲并且提高吞吐能力
? OperatorChain 組成條件(重要)
? 上下游算子并行度一致
? 上下游算子之間沒(méi)有數(shù)據(jù)shuffle
第五章 Flink 流處理Api
1 Environment
getExecutionEnvironment
創(chuàng)建一個(gè)執(zhí)行環(huán)境,表示當(dāng)前執(zhí)行程序的上下文。 如果程序是獨(dú)立調(diào)用的,則此方法返回本地執(zhí)行環(huán)境;如果從命令行客戶(hù)端調(diào)用程序以提交到集群,則此方法返回此集群的執(zhí)行環(huán)境,也就是說(shuō),getExecutionEnvironment會(huì)根據(jù)查詢(xún)運(yùn)行的方式?jīng)Q定返回什么樣的運(yùn)行環(huán)境,是最常用的一種創(chuàng)建執(zhí)行環(huán)境的方式。
|
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
|
如果沒(méi)有設(shè)置并行度,會(huì)以flink-conf.yaml中的配置為準(zhǔn),默認(rèn)是1
createLocalEnvironment
返回本地執(zhí)行環(huán)境,需要在調(diào)用時(shí)指定默認(rèn)的并行度。
|
val env = StreamExecutionEnvironment.createLocalEnvironment(1) |
createRemoteEnvironment
返回集群執(zhí)行環(huán)境,將Jar提交到遠(yuǎn)程服務(wù)器。需要在調(diào)用時(shí)指定JobManager的IP和端口號(hào),并指定要在集群中運(yùn)行的Jar包。
|
val env = ExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123,"C://jar//flink//wordcount.jar") |
2 Source
創(chuàng)建kafka工具類(lèi)
|
object MyKafkaUtil {
|
增加業(yè)務(wù)主類(lèi) StartupApp
|
object StartupApp { def main(args: Array[String]): Unit = { } |
Flink+kafka是如何實(shí)現(xiàn)exactly-once語(yǔ)義的
Flink通過(guò)checkpoint來(lái)保存數(shù)據(jù)是否處理完成的狀態(tài)
由JobManager協(xié)調(diào)各個(gè)TaskManager進(jìn)行checkpoint存儲(chǔ),checkpoint保存在 StateBackend中,默認(rèn)StateBackend是內(nèi)存級(jí)的,也可以改為文件級(jí)的進(jìn)行持久化保存。
執(zhí)行過(guò)程實(shí)際上是一個(gè)兩段式提交,每個(gè)算子執(zhí)行完成,會(huì)進(jìn)行“預(yù)提交”,直到執(zhí)行完sink操作,會(huì)發(fā)起“確認(rèn)提交”,如果執(zhí)行失敗,預(yù)提交會(huì)放棄掉。
如果宕機(jī)需要通過(guò)StateBackend進(jìn)行恢復(fù),只能恢復(fù)所有確認(rèn)提交的操作。
3 Transform
轉(zhuǎn)換算子
3.1 map
|
val streamMap = stream.map { x => x * 2 } |
3.2 flatMap
|
val streamFlatMap = stream.flatMap{ x => x.split(" ") } |
3.3 Filter
|
val streamFilter = stream.filter{ x => x == 1 } |
3.4 KeyBy
DataStream → KeyedStream:輸入必須是Tuple類(lèi)型,邏輯地將一個(gè)流拆分成不相交的分區(qū),每個(gè)分區(qū)包含具有相同key的元素,在內(nèi)部以hash的形式實(shí)現(xiàn)的。
3.5 Reduce
KeyedStream → DataStream:一個(gè)分組數(shù)據(jù)流的聚合操作,合并當(dāng)前的元素和上次聚合的結(jié)果,產(chǎn)生一個(gè)新的值,返回的流中包含每一次聚合的結(jié)果,而不是只返回最后一次聚合的最終結(jié)果。
|
//求各個(gè)渠道的累計(jì)個(gè)數(shù)
|
flink是如何保存累計(jì)值的,
flink是一種有狀態(tài)的流計(jì)算框架,其中說(shuō)的狀態(tài)包括兩個(gè)層面:
1) operator state 主要是保存數(shù)據(jù)在流程中的處理狀態(tài),用于確保語(yǔ)義的exactly-once。
2) keyed state 主要是保存數(shù)據(jù)在計(jì)算過(guò)程中的累計(jì)值。
這兩種狀態(tài)都是通過(guò)checkpoint機(jī)制保存在StateBackend中,StateBackend可以選擇保存在內(nèi)存中(默認(rèn)使用)或者保存在磁盤(pán)文件中。
3.6 Split 和 Select
Split
圖 Split
DataStream → SplitStream:根據(jù)某些特征把一個(gè)DataStream拆分成兩個(gè)或者多個(gè)DataStream。
Select
圖 Select
SplitStream→DataStream:從一個(gè)SplitStream中獲取一個(gè)或者多個(gè)DataStream。
需求:把a(bǔ)ppstore和其他的渠道的數(shù)據(jù)單獨(dú)拆分出來(lái),做成兩個(gè)流
|
// 將appstore與其他渠道拆分拆分出來(lái) 成為兩個(gè)獨(dú)立的流
|
3.7 Connect和 CoMap
圖 Connect算子
DataStream,DataStream → ConnectedStreams:連接兩個(gè)保持他們類(lèi)型的數(shù)據(jù)流,兩個(gè)數(shù)據(jù)流被Connect之后,只是被放在了一個(gè)同一個(gè)流中,內(nèi)部依然保持各自的數(shù)據(jù)和形式不發(fā)生任何變化,兩個(gè)流相互獨(dú)立。
CoMap,CoFlatMap
圖 CoMap/CoFlatMap
ConnectedStreams → DataStream:作用于ConnectedStreams上,功能與map和flatMap一樣,對(duì)ConnectedStreams中的每一個(gè)Stream分別進(jìn)行map和flatMap處理。
|
//合并以后打印
|
3.8 Union
圖 Union
DataStream → DataStream:對(duì)兩個(gè)或者兩個(gè)以上的DataStream進(jìn)行union操作,產(chǎn)生一個(gè)包含所有DataStream元素的新DataStream。注意:如果你將一個(gè)DataStream跟它自己做union操作,在新的DataStream中,你將看到每一個(gè)元素都出現(xiàn)兩次。
|
//合并以后打印
|
Connect與 Union 區(qū)別:
1 、 Union之前兩個(gè)流的類(lèi)型必須是一樣,Connect可以不一樣,在之后的coMap中再去調(diào)整成為一樣的。
2 Connect只能操作兩個(gè)流,Union可以操作多個(gè)
4 Sink
Flink沒(méi)有類(lèi)似于spark中foreach方法,讓用戶(hù)進(jìn)行迭代的操作。雖有對(duì)外的輸出操作都要利用Sink完成。最后通過(guò)類(lèi)似如下方式完成整個(gè)任務(wù)最終輸出操作。
|
myDstream.addSink(new MySink(xxxx)) |
官方提供了一部分的框架的sink。除此以外,需要用戶(hù)自定義實(shí)現(xiàn)sink。
4.1 Kafka
pom.xml
|
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
|
mykafkaUtil中增加方法
|
def getProducer(topic:String): FlinkKafkaProducer011[String] ={
|
主函數(shù)中添加sink
|
val myKafkaProducer: FlinkKafkaProducer011[String] = MyKafkaUtil.getProducer("channel_sum")
sumDstream.map( chCount=>chCount._1+":"+chCount._2 ).addSink(myKafkaProducer)
|
4.2 Redis
pom.xml
|
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
|
|
object MyRedisUtil {
|
在主函數(shù)中調(diào)用
|
sumDstream.map( chCount=>(chCount._1,chCount._2+"" )).addSink(MyRedisUtil.getRedisSink())
|
4.3 Elasticsearch
pom.xml
|
<dependency>
|
添加MyEsUtil
|
import java.util
|
在main方法中調(diào)用
|
// 明細(xì)發(fā)送到es 中
|
4.4 JDBC 自定義sink
|
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
|
添加MyJdbcSink
|
class MyJdbcSink(sql:String ) extends RichSinkFunction[Array[Any]] { //反復(fù)調(diào)用
|
在main方法中增加
把明細(xì)保存到mysql中
|
val startUplogDstream: DataStream[StartUpLog] = dstream.map{ JSON.parseObject(_,classOf[StartUpLog])}
|
第六章 Time與Window
1 Time
在Flink的流式處理中,會(huì)涉及到時(shí)間的不同概念,如下圖所示:

圖 Flink時(shí)間概念
Event Time:是事件創(chuàng)建的時(shí)間。它通常由事件中的時(shí)間戳描述,例如采集的日志數(shù)據(jù)中,每一條日志都會(huì)記錄自己的生成時(shí)間,F(xiàn)link通過(guò)時(shí)間戳分配器訪問(wèn)事件時(shí)間戳。
Ingestion Time:是數(shù)據(jù)進(jìn)入Flink的時(shí)間。
Processing Time:是每一個(gè)執(zhí)行基于時(shí)間操作的算子的本地系統(tǒng)時(shí)間,與機(jī)器相關(guān),默認(rèn)的時(shí)間屬性就是Processing Time。
例如,一條日志進(jìn)入Flink的時(shí)間為2017-11-12 10:00:00.123,到達(dá)Window的系統(tǒng)時(shí)間為2017-11-12 10:00:01.234,日志的內(nèi)容如下:
2017-11-02 18:37:15.624 INFO Fail over to rm2
對(duì)于業(yè)務(wù)來(lái)說(shuō),要統(tǒng)計(jì)1min內(nèi)的故障日志個(gè)數(shù),哪個(gè)時(shí)間是最有意義的?—— eventTime,因?yàn)槲覀円鶕?jù)日志的生成時(shí)間進(jìn)行統(tǒng)計(jì)。
2 Window
2.1 Window概述
streaming流式計(jì)算是一種被設(shè)計(jì)用于處理無(wú)限數(shù)據(jù)集的數(shù)據(jù)處理引擎,而無(wú)限數(shù)據(jù)集是指一種不斷增長(zhǎng)的本質(zhì)上無(wú)限的數(shù)據(jù)集,而window是一種切割無(wú)限數(shù)據(jù)為有限塊進(jìn)行處理的手段。
Window是無(wú)限數(shù)據(jù)流處理的核心,Window將一個(gè)無(wú)限的stream拆分成有限大小的“buckets”桶,我們可以在這些桶上做計(jì)算操作。
2.2 Window類(lèi)型
Window可以分成兩類(lèi):
l CountWindow:按照指定的數(shù)據(jù)條數(shù)生成一個(gè)Window,與時(shí)間無(wú)關(guān)。
l TimeWindow:按照時(shí)間生成Window。
對(duì)于TimeWindow,可以根據(jù)窗口實(shí)現(xiàn)原理的不同分成三類(lèi):滾動(dòng)窗口(Tumbling Window)、滑動(dòng)窗口(Sliding Window)和會(huì)話(huà)窗口(Session Window)。
1. 滾動(dòng)窗口(Tumbling Windows)
將數(shù)據(jù)依據(jù)固定的窗口長(zhǎng)度對(duì)數(shù)據(jù)進(jìn)行切片。
特點(diǎn):時(shí)間對(duì)齊,窗口長(zhǎng)度固定,沒(méi)有重疊。
滾動(dòng)窗口分配器將每個(gè)元素分配到一個(gè)指定窗口大小的窗口中,滾動(dòng)窗口有一個(gè)固定的大小,并且不會(huì)出現(xiàn)重疊。例如:如果你指定了一個(gè)5分鐘大小的滾動(dòng)窗口,窗口的創(chuàng)建如下圖所示:

圖 滾動(dòng)窗口
適用場(chǎng)景:適合做BI統(tǒng)計(jì)等(做每個(gè)時(shí)間段的聚合計(jì)算)。
2. 滑動(dòng)窗口(Sliding Windows)
滑動(dòng)窗口是固定窗口的更廣義的一種形式,滑動(dòng)窗口由固定的窗口長(zhǎng)度和滑動(dòng)間隔組成。
特點(diǎn):時(shí)間對(duì)齊,窗口長(zhǎng)度固定,有重疊。
滑動(dòng)窗口分配器將元素分配到固定長(zhǎng)度的窗口中,與滾動(dòng)窗口類(lèi)似,窗口的大小由窗口大小參數(shù)來(lái)配置,另一個(gè)窗口滑動(dòng)參數(shù)控制滑動(dòng)窗口開(kāi)始的頻率。因此,滑動(dòng)窗口如果滑動(dòng)參數(shù)小于窗口大小的話(huà),窗口是可以重疊的,在這種情況下元素會(huì)被分配到多個(gè)窗口中。
例如,你有10分鐘的窗口和5分鐘的滑動(dòng),那么每個(gè)窗口中5分鐘的窗口里包含著上個(gè)10分鐘產(chǎn)生的數(shù)據(jù),如下圖所示:

圖 滑動(dòng)窗口
適用場(chǎng)景:對(duì)最近一個(gè)時(shí)間段內(nèi)的統(tǒng)計(jì)(求某接口最近5min的失敗率來(lái)決定是否要報(bào)警)。
3. 會(huì)話(huà)窗口(Session Windows)
由一系列事件組合一個(gè)指定時(shí)間長(zhǎng)度的timeout間隙組成,類(lèi)似于web應(yīng)用的session,也就是一段時(shí)間沒(méi)有接收到新數(shù)據(jù)就會(huì)生成新的窗口。
特點(diǎn):時(shí)間無(wú)對(duì)齊。
session窗口分配器通過(guò)session活動(dòng)來(lái)對(duì)元素進(jìn)行分組,session窗口跟滾動(dòng)窗口和滑動(dòng)窗口相比,不會(huì)有重疊和固定的開(kāi)始時(shí)間和結(jié)束時(shí)間的情況,相反,當(dāng)它在一個(gè)固定的時(shí)間周期內(nèi)不再收到元素,即非活動(dòng)間隔產(chǎn)生,那個(gè)這個(gè)窗口就會(huì)關(guān)閉。一個(gè)session窗口通過(guò)一個(gè)session間隔來(lái)配置,這個(gè)session間隔定義了非活躍周期的長(zhǎng)度,當(dāng)這個(gè)非活躍周期產(chǎn)生,那么當(dāng)前的session將關(guān)閉并且后續(xù)的元素將被分配到新的session窗口中去。

圖 會(huì)話(huà)窗口
3 Window API
TimeWindow
TimeWindow是將指定時(shí)間范圍內(nèi)的所有數(shù)據(jù)組成一個(gè)window,一次對(duì)一個(gè)window里面的所有數(shù)據(jù)進(jìn)行計(jì)算。
1. 滾動(dòng)窗口
Flink默認(rèn)的時(shí)間窗口根據(jù)Processing Time 進(jìn)行窗口的劃分,將Flink獲取到的數(shù)據(jù)根據(jù)進(jìn)入Flink的時(shí)間劃分到不同的窗口中。
|
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
|
時(shí)間間隔可以通過(guò)Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個(gè)來(lái)指定。
2. 滑動(dòng)窗口(SlidingEventTimeWindows)
滑動(dòng)窗口和滾動(dòng)窗口的函數(shù)名是完全一致的,只是在傳參數(shù)時(shí)需要傳入兩個(gè)參數(shù),一個(gè)是window_size,一個(gè)是sliding_size。
下面代碼中的sliding_size設(shè)置為了2s,也就是說(shuō),窗口每2s就計(jì)算一次,每一次計(jì)算的window范圍是5s內(nèi)的所有元素。
|
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
|
時(shí)間間隔可以通過(guò)Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個(gè)來(lái)指定。
CountWindow
CountWindow根據(jù)窗口中相同key元素的數(shù)量來(lái)觸發(fā)執(zhí)行,執(zhí)行時(shí)只計(jì)算元素?cái)?shù)量達(dá)到窗口大小的key對(duì)應(yīng)的結(jié)果。
注意:CountWindow的window_size指的是相同Key的元素的個(gè)數(shù),不是輸入的所有元素的總數(shù)。
1 滾動(dòng)窗口
默認(rèn)的CountWindow是一個(gè)滾動(dòng)窗口,只需要指定窗口大小即可,當(dāng)元素?cái)?shù)量達(dá)到窗口大小時(shí),就會(huì)觸發(fā)窗口的執(zhí)行。
|
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
|
2 滑動(dòng)窗口
滑動(dòng)窗口和滾動(dòng)窗口的函數(shù)名是完全一致的,只是在傳參數(shù)時(shí)需要傳入兩個(gè)參數(shù),一個(gè)是window_size,一個(gè)是sliding_size。
下面代碼中的sliding_size設(shè)置為了2,也就是說(shuō),每收到兩個(gè)相同key的數(shù)據(jù)就計(jì)算一次,每一次計(jì)算的window范圍是5個(gè)元素。
|
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
|
第七章 EventTime與Window
1 EventTime的引入
在Flink的流式處理中,絕大部分的業(yè)務(wù)都會(huì)使用eventTime,一般只在eventTime無(wú)法使用時(shí),才會(huì)被迫使用ProcessingTime或者IngestionTime。
如果要使用EventTime,那么需要引入EventTime的時(shí)間屬性,引入方式如下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 從調(diào)用時(shí)刻開(kāi)始給env創(chuàng)建的每一個(gè)stream追加時(shí)間特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
2 Watermark
2.1 基本概念
我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個(gè)過(guò)程和時(shí)間的,雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來(lái)的,但是也不排除由于網(wǎng)絡(luò)、分布式等原因,導(dǎo)致亂序的產(chǎn)生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴(yán)格按照事件的Event Time順序排列的。

圖 數(shù)據(jù)的亂序
那么此時(shí)出現(xiàn)一個(gè)問(wèn)題,一旦出現(xiàn)亂序,如果只根據(jù)eventTime決定window的運(yùn)行,我們不能明確數(shù)據(jù)是否全部到位,但又不能無(wú)限期的等下去,此時(shí)必須要有個(gè)機(jī)制來(lái)保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了,這個(gè)特別的機(jī)制,就是Watermark。
Watermark是一種衡量Event Time進(jìn)展的機(jī)制,它是數(shù)據(jù)本身的一個(gè)隱藏屬性,數(shù)據(jù)本身攜帶著對(duì)應(yīng)的Watermark。
Watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用Watermark機(jī)制結(jié)合window來(lái)實(shí)現(xiàn)。
數(shù)據(jù)流中的Watermark用于表示timestamp小于Watermark的數(shù)據(jù),都已經(jīng)到達(dá)了,因此,window的執(zhí)行也是由Watermark觸發(fā)的。
Watermark可以理解成一個(gè)延遲觸發(fā)機(jī)制,我們可以設(shè)置Watermark的延時(shí)時(shí)長(zhǎng)t,每次系統(tǒng)會(huì)校驗(yàn)已經(jīng)到達(dá)的數(shù)據(jù)中最大的maxEventTime,然后認(rèn)定eventTime小于maxEventTime - t的所有數(shù)據(jù)都已經(jīng)到達(dá),如果有窗口的停止時(shí)間等于maxEventTime – t,那么這個(gè)窗口被觸發(fā)執(zhí)行。
有序流的Watermarker如下圖所示:(Watermark設(shè)置為0)

圖 有序數(shù)據(jù)的Watermark
亂序流的Watermarker如下圖所示:(Watermark設(shè)置為2)

圖 無(wú)序數(shù)據(jù)的Watermark
當(dāng)Flink接收到每一條數(shù)據(jù)時(shí),都會(huì)產(chǎn)生一條Watermark,這條Watermark就等于當(dāng)前所有到達(dá)數(shù)據(jù)中的maxEventTime - 延遲時(shí)長(zhǎng),也就是說(shuō),Watermark是由數(shù)據(jù)攜帶的,一旦數(shù)據(jù)攜帶的Watermark比當(dāng)前未觸發(fā)的窗口的停止時(shí)間要晚,那么就會(huì)觸發(fā)相應(yīng)窗口的執(zhí)行。由于Watermark是由數(shù)據(jù)攜帶的,因此,如果運(yùn)行過(guò)程中無(wú)法獲取新的數(shù)據(jù),那么沒(méi)有被觸發(fā)的窗口將永遠(yuǎn)都不被觸發(fā)。
上圖中,我們?cè)O(shè)置的允許最大延遲到達(dá)時(shí)間為2s,所以時(shí)間戳為7s的事件對(duì)應(yīng)的Watermark是5s,時(shí)間戳為12s的事件的Watermark是10s,如果我們的窗口1是1s~5s,窗口2是6s~10s,那么時(shí)間戳為7s的事件到達(dá)時(shí)的Watermarker恰好觸發(fā)窗口1,時(shí)間戳為12s的事件到達(dá)時(shí)的Watermark恰好觸發(fā)窗口2。
Watermark 就是觸發(fā)前一窗口的“關(guān)窗時(shí)間”,一旦觸發(fā)關(guān)門(mén)那么以當(dāng)前時(shí)刻為準(zhǔn)在窗口范圍內(nèi)的所有所有數(shù)據(jù)都會(huì)收入窗中。
只要沒(méi)有達(dá)到水位那么不管現(xiàn)實(shí)中的時(shí)間推進(jìn)了多久都不會(huì)觸發(fā)關(guān)窗。
2.2 Watermark的引入
|
val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) {
|
7.3 EvnetTimeWindow API
3.1 滾動(dòng)窗口(TumblingEventTimeWindows)
|
def main(args: Array[String]): Unit = {
|
結(jié)果是按照Event Time的時(shí)間窗口計(jì)算得出的,而無(wú)關(guān)系統(tǒng)的時(shí)間(包括輸入的快慢)。
3.2 滑動(dòng)窗口(SlidingEventTimeWindows)
|
def main(args: Array[String]): Unit = {
|
3.3 會(huì)話(huà)窗口(EventTimeSessionWindows)
相鄰兩次數(shù)據(jù)的EventTime的時(shí)間差超過(guò)指定的時(shí)間間隔就會(huì)觸發(fā)執(zhí)行。如果加入Watermark, 會(huì)在符合窗口觸發(fā)的情況下進(jìn)行延遲。到達(dá)延遲水位再進(jìn)行窗口觸發(fā)。
|
def main(args: Array[String]): Unit = {
|
第八章 Table API 與SQL
Table API是流處理和批處理通用的關(guān)系型API,Table API可以基于流輸入或者批輸入來(lái)運(yùn)行而不需要進(jìn)行任何修改。Table API是SQL語(yǔ)言的超集并專(zhuān)門(mén)為Apache Flink設(shè)計(jì)的,Table API是Scala 和Java語(yǔ)言集成式的API。與常規(guī)SQL語(yǔ)言中將查詢(xún)指定為字符串不同,Table API查詢(xún)是以Java或Scala中的語(yǔ)言嵌入樣式來(lái)定義的,具有IDE支持如:自動(dòng)完成和語(yǔ)法檢測(cè)。
1 需要引入的pom依賴(lài)
|
<dependency>
|
2 構(gòu)造表環(huán)境
|
def main(args: Array[String]): Unit = {
|
動(dòng)態(tài)表
如果流中的數(shù)據(jù)類(lèi)型是case class可以直接根據(jù)case class的結(jié)構(gòu)生成table
|
tableEnv.fromDataStream(startupLogDstream) |
或者根據(jù)字段順序單獨(dú)命名
|
tableEnv.fromDataStream(startupLogDstream,’mid,’uid .......) |
最后的動(dòng)態(tài)表可以轉(zhuǎn)換為流進(jìn)行輸出
|
table.toAppendStream[(String,String)] |
字段
用一個(gè)單引放到字段前面 來(lái)標(biāo)識(shí)字段名, 如 ‘name , ‘mid ,’amount 等
3 通過(guò)一個(gè)例子 了解TableAPI
|
//每10秒中渠道為appstore的個(gè)數(shù)
|
關(guān)于group by
1、 如果使用 groupby table轉(zhuǎn)換為流的時(shí)候只能用toRetractDstream
|
val rDstream: DataStream[(Boolean, (String, Long))] = table.toRetractStream[(String,Long)] |
2、 toRetractDstream 得到的第一個(gè)boolean型字段標(biāo)識(shí) true就是最新的數(shù)據(jù),false表示過(guò)期老數(shù)據(jù)
|
val rDstream: DataStream[(Boolean, (String, Long))] = table.toRetractStream[(String,Long)] rDstream.filter(_._1).print() |
3、 如果使用的api包括時(shí)間窗口,那么時(shí)間的字段必須,包含在group by中。
|
val table: Table = startupLogTable.filter("ch ='appstore'").window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch ,'tt).select("ch,ch.count ") |
關(guān)于時(shí)間窗口
1 用到時(shí)間窗口,必須提前聲明時(shí)間字段,如果是processTime直接在創(chuàng)建動(dòng)態(tài)表時(shí)進(jìn)行追加就可以
|
val startupLogTable: Table = tableEnv.fromDataStream(startupLogWithEtDstream,'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime) |
2 如果是EventTime要在創(chuàng)建動(dòng)態(tài)表時(shí)聲明
|
val startupLogTable: Table = tableEnv.fromDataStream(startupLogWithEtDstream,'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ps.processtime)
|
3 滾動(dòng)窗口可以使用Tumble over 10000.millis on
|
val table: Table = startupLogTable.filter("ch ='appstore'").window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch ,'tt).select("ch,ch.count ") |
4 SQL如何編寫(xiě)
|
def main(args: Array[String]): Unit = {
|
浙公網(wǎng)安備 33010602011771號(hào)