<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Flink去重第一彈:MapState去重

      去重計算應該是數據分析業務里面常見的指標計算,例如網站一天的訪問用戶數、廣告的點擊用戶數等等,離線計算是一個全量、一次性計算的過程通常可以通過distinct的方式得到去重結果,而實時計算是一種增量、長期計算過程,我們在面對不同的場景,例如數據量的大小、計算結果精準度要求等可以使用不同的方案。此篇介紹如何通過編碼方式實現精確去重,以一個實際場景為例:計算每個廣告每小時的點擊用戶數,廣告點擊日志包含:廣告位ID、用戶設備ID(idfa/imei/cookie)、點擊時間。

      實現步驟分析:

      1. 為了當天的數據可重現,這里選擇事件時間也就是廣告點擊時間作為每小時的窗口期劃分

      2. 數據分組使用廣告位ID+點擊事件所屬的小時

      3. 選擇processFunction來實現,一個狀態用來保存數據、另外一個狀態用來保存對應的數據量

      4. 計算完成之后的數據清理,按照時間進度注冊定時器清理

      實現

      廣告數據

      1. case class AdData(id:Int,devId:String,time:Long)

      分組數據

      1. case class AdKey(id:Int,time:Long)

      主流程

      1. val env=StreamExecutionEnvironment.getExecutionEnvironment

      2. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

      3.  

      4. val kafkaConfig=new Properties()

      5. kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")

      6. kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1")

      7. val consumer=new FlinkKafkaConsumer[String]("topic1",new SimpleStringSchema,kafkaConfig)

      8. val ds=env.addSource(consumer)

      9. .map(x=>{

      10. val s=x.split(",")

      11. AdData(s(0).toInt,s(1),s(2).toLong)

      12. }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[AdData](Time.minutes(1)) {

      13. override def extractTimestamp(element: AdData): Long = element.time

      14. })

      15. .keyBy(x=>{

      16. val endTime= TimeWindow.getWindowStartWithOffset(x.time, 0,

      17. Time.hours(1).toMilliseconds) + Time.hours(1).toMilliseconds

      18. AdKey(x.id,endTime)

      19. })

      指定時間時間屬性,這里設置允許1min的延時,可根據實際情況調整;
      時間的轉換選擇TimeWindow.getWindowStartWithOffset Flink在處理window中自帶的方法,使用起來很方便,第一個參數 表示數據時間,第二個參數offset偏移量,默認為0,正常窗口劃分都是整點方式,例如從0開始劃分,這個offset就是相對于0的偏移量,第三個參數表示窗口大小,得到的結果是數據時間所屬窗口的開始時間,這里加上了窗口大小,使用結束時間與廣告位ID作為分組的Key。

      去重邏輯
      自定義Distinct1ProcessFunction 繼承了KeyedProcessFunction, 方便起見使用輸出類型使用Void,這里直接使用打印控制臺方式查看結果,在實際中可輸出到下游做一個批量的處理然后在輸出;
      定義兩個狀態:MapState,key表示devId, value表示一個隨意的值只是為了標識,該狀態表示一個廣告位在某個小時的設備數據,如果我們使用rocksdb作為statebackend, 那么會將mapstate中key作為rocksdb中key的一部分,mapstate中value作為rocksdb中的value, rocksdb中value 大小是有上限的,這種方式可以減少rocksdb value的大小;另外一個ValueState,存儲當前MapState的數據量,是由于mapstate只能通過迭代方式獲得數據量大小,每次獲取都需要進行迭代,這種方式可以避免每次迭代。

      1. class Distinct1ProcessFunction extends KeyedProcessFunction[AdKey, AdData, Void] {

      2. var devIdState: MapState[String, Int] = _

      3. var devIdStateDesc: MapStateDescriptor[String, Int] = _

      4.  

      5. var countState: ValueState[Long] = _

      6. var countStateDesc: ValueStateDescriptor[Long] = _

      7.  

      8. override def open(parameters: Configuration): Unit = {

      9.  

      10. devIdStateDesc = new MapStateDescriptor[String, Int]("devIdState", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int]))

      11. devIdState = getRuntimeContext.getMapState(devIdStateDesc)

      12.  

      13. countStateDesc = new ValueStateDescriptor[Long]("countState", TypeInformation.of(classOf[Long]))

      14. countState = getRuntimeContext.getState(countStateDesc)

      15. }

      16.  

      17. override def processElement(value: AdData, ctx: KeyedProcessFunction[AdKey, AdData, Void]#Context, out: Collector[Void]): Unit = {

      18.  

      19. val currW=ctx.timerService().currentWatermark()

      20. if(ctx.getCurrentKey.time+1<=currW) {

      21. println("late data:" + value)

      22. return

      23. }

      24.  

      25. val devId = value.devId

      26. devIdState.get(devId) match {

      27. case 1 => {

      28. //表示已經存在

      29. }

      30. case _ => {

      31. //表示不存在

      32. devIdState.put(devId, 1)

      33. val c = countState.value()

      34. countState.update(c + 1)

      35. //還需要注冊一個定時器

      36. ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)

      37. }

      38. }

      39. println(countState.value())

      40. }

      41.  

      42. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[AdKey, AdData, Void]#OnTimerContext, out: Collector[Void]): Unit = {

      43. println(timestamp + " exec clean~~~")

      44. println(countState.value())

      45. devIdState.clear()

      46. countState.clear()

      47. }

      48. }

      數據清理通過注冊定時器方式ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)表示當watermark大于該小時結束時間+1就會執行清理動作,調用onTimer方法。

      在處理邏輯里面加了

      1. val currW=ctx.timerService().currentWatermark()

      2. if(ctx.getCurrentKey.time+1<=currW){

      3. println("late data:" + value)

      4. return

      5. }

      主要考慮可能會存在滯后的數據比較嚴重,會影響之前的計算結果,做了一個類似window機制里面的一個延時判斷,將延時的數據過濾掉,也可以使用OutputTag 單獨處理。

      posted @ 2020-01-12 21:09  阿甘—paul  閱讀(1193)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 麻豆国产va免费精品高清在线| 亚洲高请码在线精品av| 尤物yw193无码点击进入| 另类图片亚洲人妻中文无码| 欧美不卡无线在线一二三区观| 一区二区三区无码免费看| 性欧美vr高清极品| 濮阳市| 国产精品免费无遮挡无码永久视频| 天天拍夜夜添久久精品大| 国产成人久久777777| 在国产线视频A在线视频| gogogo高清在线观看视频中文| 久久精品无码专区免费东京热| 久久涩综合一区二区三区| 亚洲国产午夜精品理论片妓女| 一本久道中文无码字幕av| 久久亚洲国产五月综合网| 国产成人无码免费视频在线| 国产乱xxxxx97国语对白| 国产一精品一av一免费| 亚洲成a人片在线观看中文| 一个人看的www视频免费观看| 色吊丝二区三区中文字幕| 婷婷六月天在线| 人人妻人人澡人人爽| 在线中文字幕国产一区| 日本va欧美va精品发布| 国产高清在线精品一区二区三区| 在线a亚洲老鸭窝天堂| 2020国产欧洲精品网站| 天天看片视频免费观看| 四虎成人高清永久免费看| 日本成熟少妇喷浆视频| 国产AV影片麻豆精品传媒| 国产午夜福利av在线麻豆| 四虎成人在线观看免费| 亚洲の无码国产の无码步美| 韩国免费a级毛片久久| 欧美疯狂xxxxbbbb喷潮| 蜜臀久久99精品久久久久久|