<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Flink Pre-defined Timestamp Extractors / Watermark Emitters(預定義的時間戳提取/水位線發射器)

      https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_timestamp_extractors.html

      根據官網描述,Flink提供預定義的時間戳提取/水位線發射器。如下:

      Flink provides abstractions that allow the programmer to assign their own timestamps and emit their own watermarks.

      More specifically, one can do so by implementing one of the AssignerWithPeriodicWatermarks and AssignerWithPunctuatedWatermarks interfaces, depending on the use case.

      In a nutshell, the first will emit watermarks periodically, while the second does so based on some property of the incoming records, e.g. whenever a special element is encountered in the stream.

      AssignerWithPeriodicWatermarks介紹:

      源碼路徑:flink\flink-streaming-java\src\main\java\org\apache\flink\streaming\api\functions\AssignerWithPeriodicWatermarks.java

      /*
       * Licensed to the Apache Software Foundation (ASF) under one
       * or more contributor license agreements.  See the NOTICE file
       * distributed with this work for additional information
       * regarding copyright ownership.  The ASF licenses this file
       * to you under the Apache License, Version 2.0 (the
       * "License"); you may not use this file except in compliance
       * with the License.  You may obtain a copy of the License at
       *
       *    http://www.apache.org/licenses/LICENSE-2.0
       *
       * Unless required by applicable law or agreed to in writing, software
       * distributed under the License is distributed on an "AS IS" BASIS,
       * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       * See the License for the specific language governing permissions and
       * limitations under the License.
       */
      
      package org.apache.flink.streaming.api.functions;
      
      import org.apache.flink.api.common.ExecutionConfig;
      import org.apache.flink.streaming.api.watermark.Watermark;
      
      import javax.annotation.Nullable;
      
      /**
       * The {@code AssignerWithPeriodicWatermarks} assigns event time timestamps to elements,
       * and generates low watermarks that signal event time progress within the stream.
       * These timestamps and watermarks are used by functions and operators that operate
       * on event time, for example event time windows.
       *
       * <p>Use this class to generate watermarks in a periodical interval.
       * At most every {@code i} milliseconds (configured via
       * {@link ExecutionConfig#getAutoWatermarkInterval()}), the system will call the
       * {@link #getCurrentWatermark()} method to probe for the next watermark value.
       * The system will generate a new watermark, if the probed value is non-null
       * and has a timestamp larger than that of the previous watermark (to preserve
       * the contract of ascending watermarks).
       *
       * <p>The system may call the {@link #getCurrentWatermark()} method less often than every
       * {@code i} milliseconds, if no new elements arrived since the last call to the
       * method.
       *
       * <p>Timestamps and watermarks are defined as {@code longs} that represent the
       * milliseconds since the Epoch (midnight, January 1, 1970 UTC).
       * A watermark with a certain value {@code t} indicates that no elements with event
       * timestamps {@code x}, where {@code x} is lower or equal to {@code t}, will occur any more.
       *
       * @param <T> The type of the elements to which this assigner assigns timestamps.
       *
       * @see org.apache.flink.streaming.api.watermark.Watermark
       */
      public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {
      
          /**
           * Returns the current watermark. This method is periodically called by the
           * system to retrieve the current watermark. The method may return {@code null} to
           * indicate that no new Watermark is available.
           *
           * <p>The returned watermark will be emitted only if it is non-null and its timestamp
           * is larger than that of the previously emitted watermark (to preserve the contract of
           * ascending watermarks). If the current watermark is still
           * identical to the previous one, no progress in event time has happened since
           * the previous call to this method. If a null value is returned, or the timestamp
           * of the returned watermark is smaller than that of the last emitted one, then no
           * new watermark will be generated.
           *
           * <p>The interval in which this method is called and Watermarks are generated
           * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
           *
           * @see org.apache.flink.streaming.api.watermark.Watermark
           * @see ExecutionConfig#getAutoWatermarkInterval()
           *
           * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
           */
          @Nullable
          Watermark getCurrentWatermark();
      }

      AssignerWithPunctuatedWatermarks 接口介紹

      源碼路徑 flink\flink-streaming-java\src\main\java\org\apache\flink\streaming\api\functions\AssignerWithPunctuatedWatermarks.java

      /*
       * Licensed to the Apache Software Foundation (ASF) under one
       * or more contributor license agreements.  See the NOTICE file
       * distributed with this work for additional information
       * regarding copyright ownership.  The ASF licenses this file
       * to you under the Apache License, Version 2.0 (the
       * "License"); you may not use this file except in compliance
       * with the License.  You may obtain a copy of the License at
       *
       *    http://www.apache.org/licenses/LICENSE-2.0
       *
       * Unless required by applicable law or agreed to in writing, software
       * distributed under the License is distributed on an "AS IS" BASIS,
       * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       * See the License for the specific language governing permissions and
       * limitations under the License.
       */
      
      package org.apache.flink.streaming.api.functions;
      
      import org.apache.flink.streaming.api.watermark.Watermark;
      
      import javax.annotation.Nullable;
      
      /**
       * The {@code AssignerWithPunctuatedWatermarks} assigns event time timestamps to elements,
       * and generates low watermarks that signal event time progress within the stream.
       * These timestamps and watermarks are used by functions and operators that operate
       * on event time, for example event time windows.
       *
       * <p>Use this class if certain special elements act as markers that signify event time
       * progress, and when you want to emit watermarks specifically at certain events.
       * The system will generate a new watermark, if the probed value is non-null
       * and has a timestamp larger than that of the previous watermark (to preserve
       * the contract of ascending watermarks).
       *
       * <p>For use cases that should periodically emit watermarks based on element timestamps,
       * use the {@link AssignerWithPeriodicWatermarks} instead.
       *
       * <p>The following example illustrates how to use this timestamp extractor and watermark
       * generator. It assumes elements carry a timestamp that describes when they were created,
       * and that some elements carry a flag, marking them as the end of a sequence such that no
       * elements with smaller timestamps can come anymore.
       *
       * <pre>{@code
       * public class WatermarkOnFlagAssigner implements AssignerWithPunctuatedWatermarks<MyElement> {
       *
       *     public long extractTimestamp(MyElement element, long previousElementTimestamp) {
       *         return element.getSequenceTimestamp();
       *     }
       *
       *     public Watermark checkAndGetNextWatermark(MyElement lastElement, long extractedTimestamp) {
       *         return lastElement.isEndOfSequence() ? new Watermark(extractedTimestamp) : null;
       *     }
       * }
       * }</pre>
       *
       * <p>Timestamps and watermarks are defined as {@code longs} that represent the
       * milliseconds since the Epoch (midnight, January 1, 1970 UTC).
       * A watermark with a certain value {@code t} indicates that no elements with event
       * timestamps {@code x}, where {@code x} is lower or equal to {@code t}, will occur any more.
       *
       * @param <T> The type of the elements to which this assigner assigns timestamps.
       *
       * @see org.apache.flink.streaming.api.watermark.Watermark
       */
      public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {
      
          /**
           * Asks this implementation if it wants to emit a watermark. This method is called right after
           * the {@link #extractTimestamp(Object, long)} method.
           *
           * <p>The returned watermark will be emitted only if it is non-null and its timestamp
           * is larger than that of the previously emitted watermark (to preserve the contract of
           * ascending watermarks). If a null value is returned, or the timestamp of the returned
           * watermark is smaller than that of the last emitted one, then no new watermark will
           * be generated.
           *
           * <p>For an example how to use this method, see the documentation of
           * {@link AssignerWithPunctuatedWatermarks this class}.
           *
           * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
           */
          @Nullable
          Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
      }

       

      兩種接口的DEMO:

      AssignerWithPeriodicWatermarks 接口DEMO 如:http://www.rzrgm.cn/felixzh/p/9687214.html

      AssignerWithPunctuatedWatermarks 接口DEMO如下:

      package org.apache.flink.streaming.examples.wordcount;
      
      import org.apache.flink.api.common.functions.FlatMapFunction;
      import org.apache.flink.api.common.functions.ReduceFunction;
      import org.apache.flink.api.java.tuple.Tuple3;
      import org.apache.flink.core.fs.FileSystem;
      import org.apache.flink.streaming.api.TimeCharacteristic;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
      import org.apache.flink.streaming.api.watermark.Watermark;
      import org.apache.flink.streaming.api.windowing.time.Time;
      import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
      import org.apache.flink.api.common.serialization.SimpleStringSchema;
      import org.apache.flink.util.Collector;
      
      import javax.annotation.Nullable;
      import java.text.ParseException;
      import java.text.SimpleDateFormat;
      import java.util.Properties;
      
      
      public class wcNew {
          public static void main(String[] args) throws Exception {
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.enableCheckpointing(35000);
              env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
              Properties props = new Properties();
              props.setProperty("bootstrap.servers", "127.0.0.1:9092");
              props.setProperty("group.id", "flink-group-debug");
              props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
              props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      
              FlinkKafkaConsumer010<String> consumer =
                  new FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), props);
              consumer.setStartFromEarliest();
              consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter());
      
              DataStream<Tuple3<String, Integer, String>> keyedStream = env
                  .addSource(consumer)
                  .flatMap(new MessageSplitter())
                  .keyBy(0)
                  .timeWindow(Time.seconds(10))
                  .reduce(new ReduceFunction<Tuple3<String, Integer, String>>() {
                      @Override
                      public Tuple3<String, Integer, String> reduce(Tuple3<String, Integer, String> t0, Tuple3<String, Integer, String> t1) throws Exception {
                          String time0 = t0.getField(2);
                          String time1 = t1.getField(2);
                          Integer count0 = t0.getField(1);
                          Integer count1 = t1.getField(1);
                          return new Tuple3<>(t0.getField(0), count0 + count1, time0 +"|"+ time1);
                      }
                  });
      
              keyedStream.writeAsText(args[1], FileSystem.WriteMode.OVERWRITE);
              keyedStream.print();
              env.execute("Flink-Kafka num count");
          }
      
          private static class MessageWaterEmitter implements AssignerWithPunctuatedWatermarks<String> {
      
              private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd-hhmmss");
      
              /*
               * 先執行該函數,從element中提取時間戳
               *@param element  record行
               *@param previousElementTimestamp  當前的時間
               */
              @Override
              public long extractTimestamp(String element, long previousElementTimestamp) {
                  if (element != null && element.contains(",")) {
                      String[] parts = element.split(",");
                      if (parts.length == 3) {
                          try {
                              return sdf.parse(parts[2]).getTime();
                          } catch (ParseException e) {
                              e.printStackTrace();
                          }
                      }
                  }
                  return 0L;
              }
      
              /*
               * 再執行該函數,extractedTimestamp的值是extractTimestamp的返回值
               */
              @Nullable
              @Override
              public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {
                  if (lastElement != null && lastElement.contains(",")) {
                      String[] parts = lastElement.split(",");
                      if(parts.length==3) {
                          try {
                              return new Watermark(sdf.parse(parts[2]).getTime());
                          } catch (ParseException e) {
                              e.printStackTrace();
                          }
                      }
      
                  }
                  return null;
              }
          }
          private static class MessageSplitter implements FlatMapFunction<String, Tuple3<String, Integer, String>> {
      
              @Override
              public void flatMap(String s, Collector<Tuple3<String, Integer, String>> collector) throws Exception {
                  if (s != null && s.contains(",")) {
                      String[] strings = s.split(",");
                      if(strings.length==3) {
                          collector.collect(new Tuple3<>(strings[0], Integer.parseInt(strings[1]), strings[2]));
                      }
                  }
              }
          }
      }

      打包成jar包后,上傳到flink所在服務器,在控制臺輸入

      flink run -c org.apache.flink.streaming.examples.wordcount.wcNew flink-kafka.jar topic_test_numcount /tmp/numcount.txt

       控制臺輸入

      eee,1,20180504-113411
      eee,2,20180504-113415
      eee,2,20180504-113412
      eee,2,20180504-113419
      eee,1,20180504-113421

      tail -f numcount.txt 監控numcount.txt輸出 當最后一條輸入時,可以看到程序輸出了前4條的計算結果 (eee,7,20180504-113411|20180504-113415|20180504-113412|20180504-113419)

      posted @ 2018-09-21 17:39  大數據從業者FelixZh  閱讀(782)  評論(0)    收藏  舉報
      大數據從業者
      主站蜘蛛池模板: 欧美性猛交xxxx富婆| 少妇人妻偷人偷人精品| 国产精品99久久免费| 久久99精品久久久久久9| 无码一区二区三区av在线播放| 任我爽精品视频在线播放| 国产乱码日产乱码精品精| 国产av最新一区二区| 又湿又紧又大又爽A视频男| 亚洲国产欧美在线人成| 国产裸体永久免费无遮挡| 白山市| 亚洲大尺度视频在线播放| 国产草草影院ccyycom| 国产成人亚洲无码淙合青草| 起碰免费公开97在线视频| 精品不卡一区二区三区| 欧美成人www免费全部网站 | 亚成区成线在人线免费99| 免费VA国产高清大片在线| 欧美不卡无线在线一二三区观| 亚洲日本乱码熟妇色精品| 亚洲欧美在线一区中文字幕| 日韩秘 无码一区二区三区| 99国产精品国产精品久久| 日本牲交大片免费观看| 激情在线一区二区三区视频| 久久人爽人人爽人人片av| 男人猛戳女人30分钟视频大全| 久久综合亚洲色一区二区三区 | av在线播放观看国产| 少妇做爰免费视看片| 国产精品无遮挡猛进猛出| 特黄三级又爽又粗又大| 国产麻豆成人传媒免费观看| 一区二区三区精品自拍视频| 中文字幕精品亚洲无线码二区| 婷婷六月天在线| 免费a级毛视频| 亚洲av久久精品狠狠爱av| 隔壁老王国产在线精品|