Flink去重第二彈:SQL方式
在Flink去重第一彈:MapState去重中介紹了使用編碼方式完成去重,但是這種方式開發(fā)周期比較長,我們可能需要針對(duì)不同的業(yè)務(wù)邏輯實(shí)現(xiàn)不同的編碼,對(duì)于業(yè)務(wù)開發(fā)來說也需要熟悉Flink編碼,也會(huì)增加相應(yīng)的成本,我們更多希望能夠以sql的方式提供給業(yè)務(wù)開發(fā)完成自己的去重邏輯。本篇介紹如何使用sql方式完成去重。
為了與離線分析保持一致的分析語義,F(xiàn)link SQL 中提供了distinct去重方式,使用方式:
-
SELECT DISTINCT devId FROM pv
表示對(duì)設(shè)備ID進(jìn)行去重,得到一個(gè)明細(xì)結(jié)果,那么我們?cè)谑褂胐istinct來統(tǒng)計(jì)去重結(jié)果通常有兩種方式, 仍然以統(tǒng)計(jì)每日網(wǎng)站uv為例。
第一種方式
-
SELECT datatime,count(DISTINCT devId) FROM pv group by datatime
該語義表示計(jì)算網(wǎng)頁每日的uv數(shù)量,其內(nèi)部核心實(shí)現(xiàn)主要依靠DistinctAccumulator與CountAccumulator,DistinctAccumulator 內(nèi)部包含一個(gè)map結(jié)構(gòu),key 表示的是distinct的字段,value表示重復(fù)的計(jì)數(shù),CountAccumulator就是一個(gè)計(jì)數(shù)器的作用,這兩部分都是作為動(dòng)態(tài)生成聚合函數(shù)的中間結(jié)果accumulator,透過之前的聚合函數(shù)的分析可知中間結(jié)果是存儲(chǔ)在狀態(tài)里面的,也就是容錯(cuò)并且具有一致性語義的
其處理流程是:
- 將devId 添加到對(duì)應(yīng)的DistinctAccumulator對(duì)象中,首先會(huì)判斷map中是否存在該devId, 不存在則插入map中并且將對(duì)應(yīng)value記1,并且返回True;存在則將對(duì)應(yīng)的value+1更新到map中,并且返回False
- 只有當(dāng)返回True時(shí)才會(huì)對(duì)CountAccumulator做累加1的操作,以此達(dá)到計(jì)數(shù)目的
第二種方式
-
select count(*),datatime from( -
select distinct devId,datatime from pv ) a -
group by datatime
內(nèi)部是一個(gè)對(duì)devId,datatime 進(jìn)行distinct的計(jì)算,在flink內(nèi)部會(huì)轉(zhuǎn)換為以devId,datatime進(jìn)行分組的流并且進(jìn)行聚合操作,在內(nèi)部會(huì)動(dòng)態(tài)生成一個(gè)聚合函數(shù),該聚合函數(shù)createAccumulators方法生成的是一個(gè)Row(0) 的accumulator 對(duì)象,其accumulate方法是一個(gè)空實(shí)現(xiàn),也就是該聚合函數(shù)每次聚合之后返回的結(jié)果都是Row(0),通過之前對(duì)sql中聚合函數(shù)的分析(可查看GroupAggProcessFunction函數(shù)源碼), 如果聚合函數(shù)處理前后得到的值相同那么可能會(huì)不發(fā)送該條結(jié)果也可能發(fā)送一條撤回一條新增的結(jié)果,但是其最終的效果是不會(huì)影響下游計(jì)算的,在這里我們簡單理解為在處理相同的devId,datatime不會(huì)向下游發(fā)送數(shù)據(jù)即可,也就是每一對(duì)devId,datatime只會(huì)向下游發(fā)送一次數(shù)據(jù);
外部就是一個(gè)簡單的按照時(shí)間維度的計(jì)數(shù)計(jì)算,由于內(nèi)部每一組devId,datatime 只會(huì)發(fā)送一次數(shù)據(jù)到外部,那么外部對(duì)應(yīng)datatime維度的每一個(gè)devId都是唯一的一次計(jì)數(shù),得到的結(jié)果就是我們需要的去重計(jì)數(shù)結(jié)果。
兩種方式對(duì)比
- 這兩種方式最終都能得到相同的結(jié)果,但是經(jīng)過分析其在內(nèi)部實(shí)現(xiàn)上差異還是比較大,第一種在分組上選擇datatime ,內(nèi)部使用的累加器DistinctAccumulator 每一個(gè)datatime都會(huì)與之對(duì)應(yīng)一個(gè)對(duì)象,在該維度上所有的設(shè)備id, 都會(huì)存儲(chǔ)在該累加器對(duì)象的map中,而第二種選擇首先細(xì)化分組,使用datatime+devId分開存儲(chǔ),然后外部使用時(shí)間維度進(jìn)行計(jì)數(shù),簡單歸納就是:
第一種: datatime->Value{devI1,devId2..}
第二種: datatime+devId->row(0)
聚合函數(shù)中accumulator 是存儲(chǔ)在ValueState中的,第二種方式的key會(huì)比第一種方式數(shù)量上多很多,但是其ValueState占用空間卻小很多,而在實(shí)際中我們通常會(huì)選擇Rocksdb方式作為狀態(tài)后端,rocksdb中value大小是有上限的,第一種方式很容易到達(dá)上限,那么使用第二種方式會(huì)更加合適; - 這兩種方式都是全量保存設(shè)備數(shù)據(jù)的,會(huì)消耗很大的存儲(chǔ)空間,但是我們的計(jì)算通常是帶有時(shí)間屬性的,那么可以通過配置StreamQueryConfig設(shè)置狀態(tài)ttl。
![]()

浙公網(wǎng)安備 33010602011771號(hào)