Flink去重第一彈:MapState去重
去重計算應該是數據分析業務里面常見的指標計算,例如網站一天的訪問用戶數、廣告的點擊用戶數等等,離線計算是一個全量、一次性計算的過程通常可以通過distinct的方式得到去重結果,而實時計算是一種增量、長期計算過程,我們在面對不同的場景,例如數據量的大小、計算結果精準度要求等可以使用不同的方案。此篇介紹如何通過編碼方式實現精確去重,以一個實際場景為例:計算每個廣告每小時的點擊用戶數,廣告點擊日志包含:廣告位ID、用戶設備ID(idfa/imei/cookie)、點擊時間。
實現步驟分析:
-
為了當天的數據可重現,這里選擇事件時間也就是廣告點擊時間作為每小時的窗口期劃分
-
數據分組使用廣告位ID+點擊事件所屬的小時
-
選擇processFunction來實現,一個狀態用來保存數據、另外一個狀態用來保存對應的數據量
-
計算完成之后的數據清理,按照時間進度注冊定時器清理
實現
廣告數據
-
case class AdData(id:Int,devId:String,time:Long)
分組數據
-
case class AdKey(id:Int,time:Long)
主流程
-
val env=StreamExecutionEnvironment.getExecutionEnvironment -
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) -
-
val kafkaConfig=new Properties() -
kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092") -
kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1") -
val consumer=new FlinkKafkaConsumer[String]("topic1",new SimpleStringSchema,kafkaConfig) -
val ds=env.addSource(consumer) -
.map(x=>{ -
val s=x.split(",") -
AdData(s(0).toInt,s(1),s(2).toLong) -
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[AdData](Time.minutes(1)) { -
override def extractTimestamp(element: AdData): Long = element.time -
}) -
.keyBy(x=>{ -
val endTime= TimeWindow.getWindowStartWithOffset(x.time, 0, -
Time.hours(1).toMilliseconds) + Time.hours(1).toMilliseconds -
AdKey(x.id,endTime) -
})
指定時間時間屬性,這里設置允許1min的延時,可根據實際情況調整;
時間的轉換選擇TimeWindow.getWindowStartWithOffset Flink在處理window中自帶的方法,使用起來很方便,第一個參數 表示數據時間,第二個參數offset偏移量,默認為0,正常窗口劃分都是整點方式,例如從0開始劃分,這個offset就是相對于0的偏移量,第三個參數表示窗口大小,得到的結果是數據時間所屬窗口的開始時間,這里加上了窗口大小,使用結束時間與廣告位ID作為分組的Key。
去重邏輯
自定義Distinct1ProcessFunction 繼承了KeyedProcessFunction, 方便起見使用輸出類型使用Void,這里直接使用打印控制臺方式查看結果,在實際中可輸出到下游做一個批量的處理然后在輸出;
定義兩個狀態:MapState,key表示devId, value表示一個隨意的值只是為了標識,該狀態表示一個廣告位在某個小時的設備數據,如果我們使用rocksdb作為statebackend, 那么會將mapstate中key作為rocksdb中key的一部分,mapstate中value作為rocksdb中的value, rocksdb中value 大小是有上限的,這種方式可以減少rocksdb value的大小;另外一個ValueState,存儲當前MapState的數據量,是由于mapstate只能通過迭代方式獲得數據量大小,每次獲取都需要進行迭代,這種方式可以避免每次迭代。
-
class Distinct1ProcessFunction extends KeyedProcessFunction[AdKey, AdData, Void] { -
var devIdState: MapState[String, Int] = _ -
var devIdStateDesc: MapStateDescriptor[String, Int] = _ -
-
var countState: ValueState[Long] = _ -
var countStateDesc: ValueStateDescriptor[Long] = _ -
-
override def open(parameters: Configuration): Unit = { -
-
devIdStateDesc = new MapStateDescriptor[String, Int]("devIdState", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int])) -
devIdState = getRuntimeContext.getMapState(devIdStateDesc) -
-
countStateDesc = new ValueStateDescriptor[Long]("countState", TypeInformation.of(classOf[Long])) -
countState = getRuntimeContext.getState(countStateDesc) -
} -
-
override def processElement(value: AdData, ctx: KeyedProcessFunction[AdKey, AdData, Void]#Context, out: Collector[Void]): Unit = { -
-
val currW=ctx.timerService().currentWatermark() -
if(ctx.getCurrentKey.time+1<=currW) { -
println("late data:" + value) -
return -
} -
-
val devId = value.devId -
devIdState.get(devId) match { -
case 1 => { -
//表示已經存在 -
} -
case _ => { -
//表示不存在 -
devIdState.put(devId, 1) -
val c = countState.value() -
countState.update(c + 1) -
//還需要注冊一個定時器 -
ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1) -
} -
} -
println(countState.value()) -
} -
-
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[AdKey, AdData, Void]#OnTimerContext, out: Collector[Void]): Unit = { -
println(timestamp + " exec clean~~~") -
println(countState.value()) -
devIdState.clear() -
countState.clear() -
} -
}
數據清理通過注冊定時器方式ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)表示當watermark大于該小時結束時間+1就會執行清理動作,調用onTimer方法。
在處理邏輯里面加了
-
val currW=ctx.timerService().currentWatermark() -
if(ctx.getCurrentKey.time+1<=currW){ -
println("late data:" + value) -
return -
}
主要考慮可能會存在滯后的數據比較嚴重,會影響之前的計算結果,做了一個類似window機制里面的一個延時判斷,將延時的數據過濾掉,也可以使用OutputTag 單獨處理。

浙公網安備 33010602011771號