<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      RDD概念、特性、緩存策略與容錯

      一、RDD概念與特性

      1. RDD的概念

        RDD(Resilient Distributed Dataset),是指彈性分布式數(shù)據(jù)集。數(shù)據(jù)集:Spark中的編程是基于RDD的,將原始數(shù)據(jù)加載到內(nèi)存變成RDD,RDD再經(jīng)過若干次轉(zhuǎn)化,仍為RDD。分布式:讀數(shù)據(jù)一般都是從分布式系統(tǒng)中去讀,如hdfs、kafka等,所以原始文件存在磁盤是分布式的,spark加載完數(shù)據(jù)的RDD也是分布式的,換句話說RDD是抽象的概念,實際數(shù)據(jù)仍在分布式文件系統(tǒng)中;因為有了RDD,在開發(fā)代碼過程會非常方便,只需要將原始數(shù)據(jù)理解為一個集合,然后對集合進行操作即可。RDD里面每一塊數(shù)據(jù)/partition,分布在某臺機器的物理節(jié)點上,這是物理概念。彈性:這里是指數(shù)據(jù)集會進行轉(zhuǎn)換,所以會忽大忽小,partition數(shù)量忽多忽少。

      2. RDD的特性

        Spark-1.6.1源碼在org.apache.spark.rdd下的RDD.scala指出了每一個RDD都具有五個主要特點,如下:

      • A list of partion

        RDD是由一組partition組成。例如要讀取hdfs上的文本文件的話,可以使用textFile()方法把hdfs的文件加載過來,把每臺機器的數(shù)據(jù)放到partition中,并且封裝了一個HadoopRDD,這就是一個抽象的概念。每一個partition都對應(yīng)了機器中的數(shù)據(jù)。因為在hdfs中的一個Datanode,有很多的block,讀機器的數(shù)據(jù)時,會將每一個block變成一個partition,與MapReduce中split的大小由min split,max split,block size (max(min split, min(max split, block size)))決定的相同,spark中的partition大小實際上對應(yīng)了一個split的大小。經(jīng)過轉(zhuǎn)化,HadoopRDD會轉(zhuǎn)成其他RDD,如FilteredRDD、PairRDD等,但是partition還是相應(yīng)的partition,只是因為有函數(shù)應(yīng)用里面的數(shù)據(jù)變化了。

      • A function for computing each split

        對每個split(partition)都有函數(shù)操作。一個函數(shù)應(yīng)用在一個RDD上,可以理解為一個函數(shù)對集合(RDD)內(nèi)的每個元素(split)的操作。

      • A list of dependencies on other RDDs

        一個RDD依賴于一組RDD。例如,下列代碼片段

      val lines=sc.textFlie("hdfs://namenode:8020/path/file.txt")
      val wc=lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2)
      wc.foreach(println)
      sc.stop()

        這里就存在RDD的依賴關(guān)系。

      • Optionally, a Partitioner for key-value RDDs

        該可選項意思是對于一個RDD,如果其中的每一個元素是Key-Value形式時,可以傳一個Partitioner(自定義分區(qū)),讓這個RDD重新分區(qū)。這種情況的本質(zhì)是shuffle,多點到多點的數(shù)據(jù)傳輸。

      • Optionally, a list of preferred locations to compute each split on

        textFile()過程中,可以指定加載到性能好的機器中。例如,hdfs中的數(shù)據(jù)可能放在一大堆破舊的機器上,hdfs數(shù)據(jù)在磁盤上,磁盤可能很大,CPU、內(nèi)存的性能很差。Spark默認做的事情是,把數(shù)據(jù)加載進來,會把數(shù)據(jù)抽象成一個RDD,抽象進來的數(shù)據(jù)在內(nèi)存中,這內(nèi)存指的是本機的內(nèi)存,這是因為在分布式文件系統(tǒng)中,要遵循數(shù)據(jù)本地性原則,即移動計算(把函數(shù)、jar包發(fā)過去)而不移動數(shù)據(jù)(移動數(shù)據(jù)成本較高)。而一般hdfs的集群機器的內(nèi)存比較差,如果要把這么多數(shù)據(jù)加載到爛機器的內(nèi)存中,會存在問題,一是內(nèi)存可能裝不下,二是CPU差、計算能力差,這就等于沒有發(fā)揮出spark的性能。在這種情況下,Spark的RDD可以提供一個可選項,可以指定一個preferred locations,即指定一個位置來加載數(shù)據(jù)。這樣就可以指定加載到性能好的機器去計算。例如,可以將hdfs數(shù)據(jù)加載到Tachyon內(nèi)存文件系統(tǒng)中,然后再基于Tachyon來做spark程序。 

      二、RDD緩存策略

      1. 源碼

        源碼org.apache.spark.storage包下的StorageLevel.scala中定義緩存策略

        StorageLevel類默認的構(gòu)造器有五個屬性,如下圖所示:

       

       

      2. 源碼解讀

      • StorageLevel私有類的構(gòu)造器
      class StorageLevel private(
          private var _useDisk: Boolean,/*使用磁盤*/
          private var _useMemory: Boolean,/*使用內(nèi)存*/
          private var _useOffHeap: Boolean,/*不使用堆內(nèi)存(堆在JVM中)*/
          private var _deserialized: Boolean,/*不序列化*/
          private var _replication: Int = 1)/*副本數(shù),默認為1*/
      • NONE

        val NONE = new StorageLevel(false, false, false, false)

        NONE表示不需要緩存。(不使用磁盤,不用內(nèi)存,使用堆,序列化)

      • DISK_ONLY

        val DISK_ONLY = new StorageLevel(true, false, false, false)

        DISK_ONLY表示使用磁盤。(使用磁盤,不用內(nèi)存,使用堆,序列化)

      • DISK_ONLY_2

        val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)

        DISK_ONLY_2表示使用磁盤,兩個副本。(使用磁盤,不用內(nèi)存,使用堆,序列化,2)

      • MEMORY_ONLY

        val MEMORY_ONLY = new StorageLevel(false, true, false, true)

        MEMORY_ONLY表示只使用內(nèi)存,例如1G的數(shù)據(jù)要放入512M的內(nèi)存,會將數(shù)據(jù)切成兩份,先將512M加載到內(nèi)存,剩下的512M還在原來位置(如hdfs),之后如果有RDD的運算,會從內(nèi)存和磁盤中去找各自的512M數(shù)據(jù)。(不使用磁盤,使用內(nèi)存,使用堆,不序列化)

      • MEMORY_ONLY_2

        val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)

        MEMORY_ONLY_2表示只使用內(nèi)存,2個副本。(不使用磁盤,使用內(nèi)存,使用堆,不序列化,2)

      • MEMORY_ONLY_SER

        val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)

        MEMORY_ONLY_SER表示只使用內(nèi)存,序列化。(不使用磁盤,使用內(nèi)存,使用堆,序列化)

      • MEMORY_ONLY_SER_2

        val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)

        MEMORY_ONLY_SER表示只使用內(nèi)存,序列化,2個副本。(不使用磁盤,使用內(nèi)存,使用堆,序列化,2)

      • MEMORY_AND_DISK

        val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)

        MEMORY_AND_DISK和MEMORY_ONLY很類似,都使用到了內(nèi)存和磁盤,只是使用的是本機本地磁盤,例如1G數(shù)據(jù)要加載到512M的內(nèi)存中,首先將hdfs的1G數(shù)據(jù)的512M加載到內(nèi)存,另外的512M加載到本地的磁盤緩存著(和hdfs就沒有關(guān)系了),RDD要讀取數(shù)據(jù)的話就在內(nèi)存和本地磁盤中找。(使用磁盤,使用內(nèi)存,使用堆,不序列化)

      • MEMORY_AND_DISK_2

        val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)

        MEMORY_AND_DISK_2表示兩個副本。(使用磁盤,使用內(nèi)存,使用堆,不序列化,2)

      • MEMORY_AND_DISK_SER

        val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)

        MEMORY_AND_DISK_SER本地內(nèi)存和磁盤,序列化。序列化的好處在于可以壓縮,但是壓縮就意味著要解壓縮,需要消耗一些CPU。

      • MEMORY_AND_DISK_SER_2

        val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)

        MEMORY_AND_DISK_SER2,兩個副本。

      • OFF_HEAP

        val OFF_HEAP = new StorageLevel(false, false, true, false)

        OFF_HEAP不使用堆內(nèi)存(例如可以使用Tachyon的分布式內(nèi)存文件系統(tǒng))。(不使用磁盤,不用內(nèi)存,不使用堆,序列化)

      3. 緩存策略試驗

      • 不緩存
      package com.huidoo.spark
      
      import org.apache.spark.{SparkConf, SparkContext}
      
      object TestCache {
        def main(args: Array[String]): Unit = {
          val conf = new SparkConf().setAppName("TestCache").setMaster("local[2]")
          val sc = new SparkContext(conf)
          val lines = sc.textFile("hdfs://cdh01:8020/flume/2018-03-23/2230") //目錄下有17個文件,總大小約為335MB,不做緩存
      
          val beginTime1 = System.currentTimeMillis() //記錄第1個job開始時間
          val count1 = lines.count() //調(diào)用count()方法,會產(chǎn)生一個job
          val endTime1 = System.currentTimeMillis() //記錄第1個job結(jié)束時間
      
          val beginTime2 = System.currentTimeMillis() //記錄第2個job開始時間
          val count2 = lines.count() //調(diào)用count()方法,會產(chǎn)生一個job
          val endTime2 = System.currentTimeMillis() //記錄第2個job結(jié)束時間
      
          println(count1)
          println("第1個job總共消耗時間" + (endTime1 - beginTime1) + "毫秒")
          println(count2)
          println("第2個job總共消耗時間" + (endTime2 - beginTime2) + "毫秒")
          sc.stop()
        }
      }

        運行結(jié)果如下:

        可見,所有文件的總行數(shù)為1935077行,第一個job和第二個job的用時分別為14.7s和12.2s,差別不大。

      • 緩存 

         只需在原代碼基礎(chǔ)上將HadoopRDD lines添加調(diào)用cache()方法即可。

      val lines = sc.textFile("hdfs://cdh01:8020/flume/2018-03-23/2230").cache() //目錄下有17個文件,總大小約為335MB,做緩存

         運行結(jié)果如下:

        可見,所有文件的總行數(shù)為1935077行,第一個job和第二個job的用時分別為19.4s和0.09s,速度相比不做緩存明顯提升。這是因為沒有做緩存,第二個job還需要先從hdfs上讀取數(shù)據(jù),需要消耗更長時間;而做了緩存則直接從緩存中讀取(cache方法默認緩存策略是MEMORY_ONLY),所以速度會快很多。 

      三、RDD Lineage與容錯

      1. Lineage(血統(tǒng))

         一系列RDD到RDD的transformation操作,稱為lineage(血統(tǒng))。某個RDD依賴于它前面的所有RDD。例如一個由10個RDD到RDD的轉(zhuǎn)化構(gòu)成的lineage,如果在計算到第9個RDD時失敗了,一般較好的計算框架會自動重新計算。一般地,這種錯誤發(fā)生了會去找上一個RDD,但是實際上如果不做緩存是找不到的,因為即使RDD9知道它是由RDD8轉(zhuǎn)化過來的,但是因為它并沒有存RDD數(shù)據(jù)本身,在內(nèi)存中RDD瞬時轉(zhuǎn)化,瞬間就會在內(nèi)存中消失,所以還是找不到數(shù)據(jù)。如果這時RDD8做過cache緩存,那么就是在RDD8的時候進行了數(shù)據(jù)的保存并記錄了位置,這時如果RDD9失敗了就會從緩存中讀取RDD8的數(shù)據(jù);如果RDD8沒有做cache就會找RDD7,以此類推,如果都沒有做cache就需要重新從HDFS中讀取數(shù)據(jù)。所以所謂的容錯就是指,當計算過程復雜,為了降低因某些關(guān)鍵點計算出錯而需要重新計算的帶來的慘重代價的風險,則需要在某些關(guān)鍵點使用cache或用persist方法做一下緩存。

      2. 容錯

      • 容錯理論

        上述緩存策略還存在一個問題。使用cache或persist的緩存策略是使用默認的僅在內(nèi)存,所以實際的RDD緩存位置是在內(nèi)存當中,如果機器出現(xiàn)問題,也會造成內(nèi)存中的緩存RDD數(shù)據(jù)丟失。所以可以將要做容錯的RDD數(shù)據(jù)存到指定磁盤(可以是hdfs)路徑中,可以對RDD做doCheckpoint()方法。使用doCheckpoint()方法的前提示,需要在sc中要先設(shè)置SparkContext.setCheckpointDir(),設(shè)置數(shù)據(jù)存儲路徑。這時候如果程序計算過程中出錯了,會先到cache中找緩存數(shù)據(jù),如果cache中沒有就會到設(shè)置的磁盤路徑中找。

        在RDD計算,通過checkpoint進行容錯,做checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶可以控制采用哪種方式來實現(xiàn)容錯,默認是logging the updates方式,通過記錄跟蹤所有生成RDD的轉(zhuǎn)換(transformations)也就是記錄每個RDD的lineage(血統(tǒng))來重新計算生成丟失的分區(qū)數(shù)據(jù)。 

      • 容錯源碼解讀
      //RDD.scala中的doCheckpoint方法:
      /**
       * Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD
       * has completed (therefore the RDD has been materialized and potentially stored in memory).
       * doCheckpoint() is called recursively on the parent RDDs.
       */
      private[spark] def doCheckpoint(): Unit = {
        
        RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
          //如果doCheckpointCalled不為true,就先將其改為true
          if (!doCheckpointCalled) {
            doCheckpointCalled = true
            //如果checkpointData已定義,就把data get出來,然后做一下checkpoint。
            if (checkpointData.isDefined) {
              checkpointData.get.checkpoint()
            } else {
              //如果checkpointData沒有的話,就把這個RDD的所有依賴拿出來,foreach一把,把里面的每個元素RDD,再遞歸調(diào)用本方法。
              dependencies.foreach(_.rdd.doCheckpoint())
            }
          }
        }
      }
      //RDD.scala中的checkpoint()方法
      def checkpoint(): Unit = RDDCheckpointData.synchronized {
        // NOTE: we use a global lock here due to complexities downstream with ensuring
        // children RDD partitions point to the correct parent partitions. In the future
        // we should revisit this consideration.
        //首先檢查context的checkpointDir是否為空,如果沒有設(shè)置就會拋出異常
        if (context.checkpointDir.isEmpty) {
          throw new SparkException("Checkpoint directory has not been set in the SparkContext")
        } else if (checkpointData.isEmpty) {
          checkpointData = Some(new ReliableRDDCheckpointData(this))
        }
      }
      //SparkContext.scala中的setCheckpointDir方法
      /**
       * Set the directory under which RDDs are going to be checkpointed. The directory must
       * be a HDFS path if running on a cluster.
       */
      def setCheckpointDir(directory: String) {
      
        // If we are running on a cluster, log a warning if the directory is local.
        // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from
        // its own local file system, which is incorrect because the checkpoint files
        // are actually on the executor machines.
        //如果運行了集群模式,checkpointDir必須是非本地的。
        if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
          logWarning("Checkpoint directory must be non-local " +
            "if Spark is running on a cluster: " + directory)
        }
      
        checkpointDir = Option(directory).map { dir =>
          val path = new Path(dir, UUID.randomUUID().toString)
          val fs = path.getFileSystem(hadoopConfiguration)
          fs.mkdirs(path)
          fs.getFileStatus(path).getPath.toString
        }
      }
      posted @ 2018-04-01 15:16  Huidoo_Yang  閱讀(1357)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 平度市| 久久成人国产精品免费软件| 亚洲日本精品一区二区| 最新中文字幕国产精品| 国产午夜A理论毛片| 久久永久视频| 日韩在线一区二区每天更新| 日韩在线视频一区二区三区| 一区二区三区精品自拍视频| 香港特级三A毛片免费观看| 日韩一欧美内射在线观看| 国产精品三级一区二区三区| 国产精品一区 在线播放| 国产午夜一区二区在线观看| 91中文字幕一区在线| 国产v综合v亚洲欧美久久| 亚洲色婷婷婷婷五月基地| 国产精品国三级国产av| 高颜值午夜福利在线观看| 色国产视频| 久久人人97超碰精品| 深夜福利资源在线观看| 无码免费中文字幕视频| 国产精品播放一区二区三区 | 日韩精品福利一区二区三区| 91密桃精品国产91久久| 福利在线视频一区二区| 亚洲老熟女一区二区三区| 中文字幕国产原创国产| 欧美最猛性xxxxx大叫| 日韩精品卡1卡2日韩在线| 欧美丝袜高跟鞋一区二区| 97精品亚成在人线免视频| 视频一区视频二区制服丝袜 | 午夜福利影院不卡影院| 亚洲成av人片一区二区| 亚洲欧美偷国产日韩| 精品无套挺进少妇内谢| 精品国产AⅤ无码一区二区| 久久精品国产再热青青青| 亚洲国产一区二区精品专|