<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      lenmom

      博客園 首頁 新隨筆 聯系 訂閱 管理

      反壓(Back Pressure)機制主要用來解決流處理系統中,處理速度比攝入速度慢的情況。是控制流處理中批次流量過載的有效手段。

      1 反壓機制原理

      Spark Streaming中的反壓機制是Spark 1.5.0推出的新特性,可以根據處理效率動態調整攝入速率。

      1.1 反壓定義

      當批處理時間(Batch Processing Time)大于批次間隔(Batch Interval,即 BatchDuration)時,說明處理數據的速度小于數據攝入的速度,持續時間過長或源頭數據暴增,容易造成數據在內存中堆積,最終導致Executor OOM或任務奔潰。

       

      1.2 反壓的數據源方式及限流處理

      spark streaming的數據源方式有兩種:

      1. 若是基于Receiver的數據源,可以通過設置spark.streaming.receiver.maxRate來控制最大輸入速率;
      2. 若是基于Direct的數據源(如Kafka Direct Stream),則可以通過設置spark.streaming.kafka.maxRatePerPartition來控制最大輸入速率。

      當然,在事先經過壓測,且流量高峰不會超過預期的情況下,設置這些參數一般沒什么問題。但最大值,不代表是最優值,最好還能根據每個批次處理情況來動態預估下個批次最優速率。

      在Spark 1.5.0以上,就可通過背壓機制來實現。開啟反壓機制,即設置spark.streaming.backpressure.enabled為true,Spark Streaming會自動根據處理能力來調整輸入速率,從而在流量高峰時仍能保證最大的吞吐和性能。

       

      1.3 反壓的實現原理

      Spark Streaming的反壓機制中,有以下幾個重要的組件:

      • RateController
      • RateEstimator
      • RateLimiter

      主要是通過RateController組件來實現。RateController繼承自接口StreamingListener,并實現了onBatchCompleted方法。每一個Batch處理完成后都會調用此方法,具體如下:

       override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
          val elements = batchCompleted.batchInfo.streamIdToInputInfo
      
          for {
            // 處理結束時間
            processingEnd <- batchCompleted.batchInfo.processingEndTime
            // 處理時間,即`processingEndTime` - `processingStartTime`
            workDelay <- batchCompleted.batchInfo.processingDelay
            // 在調度隊列中的等待時間,即`processingStartTime` - `submissionTime`
            waitDelay <- batchCompleted.batchInfo.schedulingDelay
            // 當前批次處理的記錄數
            elems <- elements.get(streamUID).map(_.numRecords)
          } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
        }

      可以看到,接著又調用的是computeAndPublish方法,如下:

      private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
          Future[Unit] {
            // 根據處理時間、調度時間、當前Batch記錄數,預估新速率
            val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
            newRate.foreach { s =>
            // 設置新速率
              rateLimit.set(s.toLong)
            // 發布新速率
              publish(getLatestRate())
            }
          }

      更深一層,具體調用的是rateEstimator.compute方法來預估新速率,

       

      def compute(
            time: Long,
            elements: Long,
            processingDelay: Long,
            schedulingDelay: Long): Option[Double]

      RateEstimator是速率估算器,主要用來估算最大處理速率,默認的在2.2之前版本中只支持PIDRateEstimator,在以后的版本可能會支持使用其他插件,其源碼如下:

       

        def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
          conf.get("spark.streaming.backpressure.rateEstimator", "pid") match {
            case "pid" =>
              val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
              val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
              val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
              val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)
              new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate)
          //默認的只支持pid,其他的配置拋出異常
            case estimator =>
              throw new IllegalArgumentException(s"Unknown rate estimator: $estimator")
          }

       

      以上這兩個組件都是在Driver端用于更新最大速度的,而RateLimiter是用于接收到Driver的更新通知之后更新Executor的最大處理速率的組件。RateLimiter是一個抽象類,它并不是Spark本身實現的,而是借助了第三方Google的GuavaRateLimiter來產生的。

      它實質上是一個限流器,也可以叫做令牌,如果Executor中task每秒計算的速度大于該值則阻塞,如果小于該值則通過,將流數據加入緩存中進行計算。這種機制也可以叫做令牌桶機制,圖示如下:

       

       接收到的newRate進行比較,取兩者中的最小值來作為最大處理速率,如果沒有設置,直接設置為newRate。源碼如下:

      private[receiver] def updateRate(newRate: Long): Unit =
          if (newRate > 0) {
            if (maxRateLimit > 0) {
              //如果設置了maxRateLimit則取兩者中的最小值
              rateLimiter.setRate(newRate.min(maxRateLimit))
            } else {
              rateLimiter.setRate(newRate)
            }
          }

      spark 1.5引入的反壓機制架構圖如下:

       

       

      2. 反壓機制相關參數

      參數名稱 默認值 說明
      spark.streaming.backpressure.enabled false 是否啟用反壓機制
      spark.streaming.backpressure.initialRate 初始最大接收速率。只適用于Receiver Stream,不適用于Direct Stream。
      spark.streaming.backpressure.rateEstimator pid 速率控制器,Spark 默認只支持此控制器,可自定義。
      spark.streaming.backpressure.pid.proportional 1.0 只能為非負值。當前速率與最后一批速率之間的差值對總控制信號貢獻的權重。用默認值即可。
      spark.streaming.backpressure.pid.integral 0.2 只能為非負值。比例誤差累積對總控制信號貢獻的權重。用默認值即可
      spark.streaming.backpressure.pid.derived 0 只能為非負值。比例誤差變化對總控制信號貢獻的權重。用默認值即可
      spark.streaming.backpressure.pid.minRate 100 只能為正數,最小速率

       

      3. 反壓機制的使用

      //啟用反壓
      conf.set("spark.streaming.backpressure.enabled","true")
      //最小攝入條數控制
      conf.set("spark.streaming.backpressure.pid.minRate","1")
      //最大攝入條數控制
      conf.set("spark.streaming.kafka.maxRatePerPartition","12")

      注意:

      1. 反壓機制真正起作用時需要至少處理一個批:由于反壓機制需要根據當前批的速率,預估新批的速率,所以反壓機制真正起作用前,應至少保證處理一個批。
      2. 如何保證反壓機制真正起作用前應用不會崩潰:要保證反壓機制真正起作用前應用不會崩潰,需要控制每個批次最大攝入速率。若為Direct Stream,如Kafka Direct Stream,則可以通過spark.streaming.kafka.maxRatePerPartition參數來控制。此參數代表了 每秒每個分區最大攝入的數據條數。假設BatchDuration為10秒,spark.streaming.kafka.maxRatePerPartition為12條,kafka topic 分區數為3個,則一個批(Batch)最大讀取的數據條數為360條(3*12*10=360)。同時,需要注意,該參數也代表了整個應用生命周期中的最大速率,即使是背壓調整的最大值也不會超過該參數。

       4. 查看日志

      創建速率控制器
      INFO PIDRateEstimator: Created PIDRateEstimator with proportional = 1.0, integral = 0.2, derivative = 0.0, min rate = 1.0
      計算當前批次速率
      // records 記錄數(對應WebUI: Input Size)
      // processing time 處理時間,毫秒(對應WebUI: Processing Time)
      // scheduling delay 調度時間,毫秒(對應WebUI: Scheduling Delay)
      TRACE PIDRateEstimator: 
      time = 1558888897224, # records = 33, processing time = 24548, scheduling delay = 8
      預估新批次速率
      TRACE PIDRateEstimator: 
       latestRate = -1.0, error = -2.344305035033404
       latestError = -1.0, historicalError = 0.0010754440280267231
       delaySinceUpdate = 1.558888897225E9, dError = -8.623482003280801E-10
      第一次計算跳過速率估計
      TRACE PIDRateEstimator: First run, rate estimation skipped
      當前批次沒有記錄或沒有延遲則跳過速率估計
      TRACE PIDRateEstimator: Rate estimation skipped
      以新的預估速率運行
      TRACE PIDRateEstimator: New rate = 1.0

      WebUI

       

       可以看到,開啟反壓后,攝入速率Input Rate可以根據處理時間Processing Time來調整,從而保證應用的穩定性。

      posted on 2019-12-11 14:37  老董  閱讀(5535)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 中文字幕国产日韩精品| 中文字幕亚洲精品人妻| 国产日韩综合av在线| 日本无码欧美一区精品久久| 免费看欧美全黄成人片| 国产精品SM捆绑调教视频| 国产香蕉久久精品综合网| 欧美国产精品啪啪| 国产在线98福利播放视频| 91亚洲国产成人精品性色| 亚洲精品不卡av在线播放| 国产尤物精品自在拍视频首页| 国产精品有码在线观看| 久久久久青草线综合超碰| av无码久久久久不卡网站蜜桃 | 欧美成人h亚洲综合在线观看| 在线精品亚洲区一区二区| 国产一区二区内射最近更新| www插插插无码免费视频网站| 国产精品妇女一区二区三区| 少妇人妻偷人精品免费| 人人妻人人爽人人添夜夜欢视频| 97亚洲色欲色欲综合网| 国产视色精品亚洲一区二区| 久久精品国产亚洲av麻豆不卡| 国厂精品114福利电影免费| 人妻加勒比系列无码专区| 亚洲人成网站77777在线观看| 国产成人精品一区二区三区| 东京热人妻无码人av| 国产精品麻豆成人AV电影艾秋| 69人妻精品中文字幕| 国产精品推荐视频一区二区| 无码人妻aⅴ一区二区三区蜜桃| 亚洲av专区一区| 国内精品国产三级国产a久久| 亚洲精品香蕉一区二区| 男人的天堂av社区在线| 国产精品高清国产三级囯产AV| 国内精品极品久久免费看| 久久精品免视看国产成人|