<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      kafka使用getOffsetsBefore()獲取獲取offset異常分析

      根據時間戳獲取kafka的topic的偏移量,結果獲取的偏移量量數據組的長度為0,就會出現如下的數組下標越界的異常,實現的原理是使用了kafka的getOffsetsBefore()方法:

       

      Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException : 0

           at co.gridport.kafka.hadoop.KafkaInputFetcher.getOffset(KafkaInputFetcher.java:126)

           at co.gridport.kafka.hadoop.TestKafkaInputFetcher.testGetOffset(TestKafkaInputFetcher.java:68)

           at co.gridport.kafka.hadoop.TestKafkaInputFetcher.main(TestKafkaInputFetcher.java:80)

      OffsetResponse(0,Map([barrage_detail,0] -> error: kafka.common.UnknownException offsets: ))

      源碼如下:

      /*

            * 得到partition的offset Finding Starting Offset for Reads

            */

      public Long getOffset(Long time) throws IOException {

                 TopicAndPartition topicAndPartition = new TopicAndPartition(this.topic , this.partition );

                 Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();

      requestInfo.put( topicAndPartition, new PartitionOffsetRequestInfo(time, 1));

                 kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(

      requestInfo, kafka.api.OffsetRequest.CurrentVersion(), this. client_id);

                 OffsetResponse response = this. kafka_consumer.getOffsetsBefore( request);

      if ( response.hasError()) {

      log.error( "Error fetching data Offset Data the Broker. Reason: " + response.errorCode(this.topic, this. partition));

      throw new IOException ( "Error fetching kafka Offset by time:" + response.errorCode(this.topic, this. partition));

                 }

      //         if (response.offsets(this.topic, this.partition).length == 0){

      //              return getOffset(kafka.api.OffsetRequest

      //                         .EarliestTime());

      //         }

      return response.offsets( this. topic, this. partition)[0];

           }

      返回的response對象會有error: kafka.common.UnknownException offsets如下異常:

      OffsetResponse(0,Map([barrage_detail,0] -> error: kafka.common.UnknownException offsets: ))

      同時呢,response.hasError()檢查不到error。

      是什么原因造成了response.offsets(this.topic,this.partition)的返回數組的長度為0呢?

      分析了getOffsetsBefore()方法的源碼,并做源碼了大量的測試,終于重現了這種情況?

      1.getOffsetsBefore()的功能以及實現原理:

      getOffsetsBefore的功能是返回某個時間點前的maxOffsetNum個offset(時間點指的是kafka日志文件的最后修改時間,offset指的是log文件名中的offset,這個offset是該日志文件的第一條記錄的offset,即base offset;maxNumOffsets參數即返回結果的最大個數,如果該參數為2,就返回指定時間點前的2個offset,如果是負數,就報邏輯錯誤,怎么能聲明一個長度為負數的數組呢,呵呵);

      根據這個實現原理,所以返回的結果長度為0是合理的,反映的是這個時間點前沒有kafka日志這種情況,該情況自然就沒有offset了。

      說明我們指定的時間參數太早了,正常的時間范圍為:最早的時間之后的時間參數都可以有返回值。

      其實合理的處理方式應該為如果這個時間點前沒有值,就返回最早的offset了,對api的使用者就友好多了我們可以自己來實現這個功能。

      代碼如下:

       

      /*

            * 得到partition的offset Finding Starting Offset for Reads

            */

      public Long getOffset(Long time ) throws IOException {

                 TopicAndPartition topicAndPartition = new TopicAndPartition(this .topic , this.partition);

                 Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();

      requestInfo.put( topicAndPartition, new PartitionOffsetRequestInfo(time, 1));

                 kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(

      requestInfo, kafka.api.OffsetRequest.CurrentVersion(), this. client_id);

                 OffsetResponse response = this. kafka_consumer.getOffsetsBefore( request);

      if ( response.hasError()) {

      log.error( "Error fetching data Offset Data the Broker. Reason: " + response.errorCode(this.topic, this. partition));

      throw new IOException ( "Error fetching kafka Offset by time:" + response.errorCode(this.topic, this. partition));

                 }

                  //如果返回的數據長度為0,就獲取最早的offset。

      if ( response.offsets( this. topic, this. partition). length == 0){

      return getOffset(kafka.api.OffsetRequest

                                 . EarliestTime());

                 }

      return response.offsets( this. topic, this. partition)[0];

           }

      posted @ 2016-03-07 11:09  丹江湖畔養蜂子趙大爹  閱讀(4442)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 国产精品综合av一区二区| 昌都县| 人妻少妇精品视频专区| 国内自拍视频一区二区三区| 国产成人综合久久亚洲精品| 中文字幕人妻色偷偷久久| 在线中文字幕国产精品| 少妇被粗大的猛烈xx动态图| 亚洲精品一区二区三区片| 电影在线观看+伦理片| 亚洲av永久无码精品水牛影视 | 亚洲AV无码不卡在线播放| 国产成人精品无人区一区| 91中文字幕在线一区| 国产精品自在拍首页视频8| 国产高清在线精品一区二区三区| 男女猛烈无遮挡免费视频APP| 狠狠躁夜夜躁人人爽天天| 亚洲老妇女亚洲老熟女久| 免费人成网站免费看视频| 久九九精品免费视频| 亚洲黄色片一区二区三区| 偷拍专区一区二区三区| 亚洲国产日韩欧美一区二区三区| 久久日产一线二线三线| 久久精品99国产精品亚洲| 中文国产日韩欧美二视频| 漂亮人妻被强中文字幕久久| 国产极品丝尤物在线观看| 中文字幕av无码免费一区| 一本久道久久综合狠狠躁av| 国产精品第一页中文字幕| 在线观看潮喷失禁大喷水无码| 午夜精品久久久久久久爽| 日本一区二区三区视频版| 成av免费大片黄在线观看| 国产精品亚洲一区二区z| 亚洲精品国产一二三区| 国产精品久久久久免费观看| 亚洲乱码一二三四区| 免费国产黄线在线观看|