<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Flink去重第三彈:HyperLogLog去重

      HyperLogLog算法 也就是基數估計統計算法,預估一個集合中不同數據的個數,也就是我們常說的去重統計,在redis中也存在hyperloglog 類型的結構,能夠使用12k的內存,允許誤差在0.81%的情況下統計2^64個數據,在這種大數據量情況下能夠減少存儲空間的消耗,但是前提是允許存在一定的誤差。關于HyperLogLog算法原理可以參考這篇文章:https://www.jianshu.com/p/55defda6dcd2里面做了詳細的介紹,其算法實現在開源java流式計算庫stream-lib提供了其具體實現代碼,由于代碼比較長就不貼出來(可以后臺回復hll ,獲取flink使用hll去重的完整代碼)。

      測試一下其使用效果,準備了97320不同數據:

       

      public static void main(String[] args) throws Exception{

      String filePath = "000000_0";
      BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));

      Set<String> values =new HashSet<>();
      HyperLogLog logLog=new HyperLogLog(0.01); //允許誤差

      String line = "";
      while ((line = br.readLine()) != null) {
      String[] s = line.split(",");
      String uuid = s[0];
      values.add(uuid);
      logLog.offer(uuid);
      }

      long rs=logLog.cardinality();
      }

      當誤差值為0.01 時; rs為98228,需要內存大小int[1366] //內部數據結構
      當誤差值為0.001時;rs為97304 ,需要內存大小int[174763]
      誤差越小也就越來越接近其真實數據,但是在這個過程中需要的內存也就越來越大,這個取舍可根據實際情況決定。

      在開發中更多希望通過sql方式來完成,那么就將hll與udaf結合起來使用,實現代碼如下:

       

      public class HLLDistinctFunction extends AggregateFunction<Long,HyperLogLog> {

      @Override public HyperLogLog createAccumulator() {
      return new HyperLogLog(0.001);
      }

      public void accumulate(HyperLogLog hll,String id){
      hll.offer(id);
      }

      @Override public Long getValue(HyperLogLog accumulator) {
      return accumulator.cardinality();
      }
      }

      定義的返回類型是long 也就是去重的結果,accumulator是一個HyperLogLog類型的結構。

      測試:

       

      case class AdData(id:Int,devId:String,datatime:Long)object Distinct1 {  def main(args: Array[String]): Unit = {
         val env=StreamExecutionEnvironment.getExecutionEnvironment
         val tabEnv=StreamTableEnvironment.create(env)
         tabEnv.registerFunction("hllDistinct",new HLLDistinctFunction)
         val kafkaConfig=new Properties()
        kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")
         kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1")
         val consumer=new FlinkKafkaConsumer[String]("topic1",new SimpleStringSchema,kafkaConfig)
         consumer.setStartFromLatest()
         val ds=env.addSource(consumer)
           .map(x=>{
             val s=x.split(",")
             AdData(s(0).toInt,s(1),s(2).toLong)
           })
         tabEnv.registerDataStream("pv",ds)
         val rs=tabEnv.sqlQuery(      """ select hllDistinct(devId) ,datatime
                                               from pv group by datatime
           """.stripMargin)
         rs.writeToSink(new PaulRetractStreamTableSink)
         env.execute()
       }
      }

      準備測試數據

       

      1,devId1,1577808000000
      1,devId2,1577808000000
      1,devId1,1577808000000

      得到結果:

       

      4> (true,1,1577808000000)
      4> (false,1,1577808000000)
      4> (true,2,1577808000000)

      其基本使用介紹到這里,后續還將進一步優化。

      posted @ 2020-01-12 21:12  阿甘—paul  閱讀(732)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 18禁动漫一区二区三区| 国产精品国语对白一区二区| 亚洲精品成人区在线观看| 久久国产自偷自免费一区| 午夜福利国产精品视频| 长腿校花无力呻吟娇喘| 免费无码AV一区二区波多野结衣 | 亚洲国产精品第一区二区| 国产精品成人中文字幕| 在线中文字幕国产一区| 少妇人妻综合久久中文字幕| 国产精品久久久久久免费软件| 久久爱在线视频在线观看| 国产性色av免费观看| 国产极品精品自在线不卡| 广水市| 亚洲人午夜射精精品日韩| 丝袜a∨在线一区二区三区不卡| 久久超碰色中文字幕超清| 激情久久综合精品久久人妻| 西西444www高清大胆| 国产精品黄色片| 日韩欧美猛交xxxxx无码| 九九热在线精品免费视频| 亚洲成在人线AV品善网好看| 成午夜福利人试看120秒| 国产成人精品久久性色av| 成年无码av片在线蜜芽| 亚洲成色在线综合网站| 韩城市| 亚洲一区成人av在线| 她也色tayese在线视频| 国产精品制服丝袜无码| 99久久精品久久久久久婷婷| 推油少妇久久99久久99久久| 亚洲一区二区三成人精品| 国产中文字幕一区二区| 国产玩具酱一区二区三区| 囯产精品一区二区三区线| 亚洲国产中文字幕在线视频综合| 国产精品自拍自在线播放|