<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      spark數(shù)據(jù)分區(qū)數(shù)量的原理

       

       

      原始RDD或數(shù)據(jù)集中的每一個(gè)分區(qū)都映射一個(gè)或多個(gè)數(shù)據(jù)文件, 該映射是在文件的一部分或者整個(gè)文件上完成的。

      Spark Job RDD/datasets在執(zhí)行管道中,通過根據(jù)分區(qū)到數(shù)據(jù)文件的映射讀取數(shù)據(jù)輸入到RDD/dataset。

      如何根據(jù)某些參數(shù)確定spark的分區(qū)數(shù)?

       


       

      使用Dataset APIs讀取數(shù)據(jù)的分區(qū)數(shù):

      functions:

      https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/DataFrameReader.html

      *文件格式 APIs*
      Dataset<Row> = SparkSession.read.csv(...)
      Dataset<Row> = SparkSession.read.json(...)
      Dataset<Row> = SparkSession.read.text(...)
      Dataset<Row> = SparkSession.read.parquet(...)
      Dataset<Row> = SparkSession.read.orc(...)
      
      *通用格式 API*
      Dataset<Row> = SparkSession.read.format(String fileformat).load(...)

       

      影響數(shù)據(jù)分區(qū)數(shù)的參數(shù):

      (a)spark.default.parallelism (default: Total No. of CPU cores)
      (b)spark.sql.files.maxPartitionBytes (default: 128 MB) 【讀取文件時(shí)打包到單個(gè)分區(qū)中的最大字節(jié)數(shù)?!?br>(c)spark.sql.files.openCostInBytes (default: 4 MB)  【 該參數(shù)默認(rèn)4M,表示小于4M的小文件會(huì)合并到一個(gè)分區(qū)中,用于減小小文件,防止太多單個(gè)小文件占一個(gè)分區(qū)情況。這個(gè)參數(shù)就是合并小文件的閾值,小于這個(gè)閾值的文件將會(huì)合并?!?/p>

       

      使用這些配置參數(shù)值,一個(gè)名為maxSplitBytes的最大分割準(zhǔn)則被計(jì)算如下:

       

      maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore)

       

      bytesPerCore = (文件總大小 + 文件個(gè)數(shù) * openCostInBytes)/ default.parallelism

       

      maxSplitBytes: 

      for each_file in files:
          if each_file is can split:
              if each_file.size() > maxSplitBytes:
                  # file 被切分為 block_number 塊其中block_number-1大小為 maxSplitBytes,1塊<=maxSplitBytes
                  block_number = ceil(each_file.size() / maxSplitBytes)
              else:
                  block_number = 1
          else:
              #文件不可分
              block_number = 1

       

      數(shù)據(jù)文件計(jì)算文件塊之后,將一個(gè)或多個(gè)文件塊打包到一個(gè)分區(qū)中。

      打包過程從初始化一個(gè)空分區(qū)開始,然后對每個(gè)文件塊進(jìn)行迭代:

      1. 如果沒有當(dāng)前分區(qū)要打包,請初始化要打包的新分區(qū),然后將迭代的文件塊分配給該分區(qū)。 分區(qū)大小成為塊大小與“ openCostInBytes”的額外開銷的總和。

      2.如果塊大小的增加不超過當(dāng)前分區(qū)(正在打包)的大小超過' maxSplitBytes ',那么文件塊將成為當(dāng)前分區(qū)的一部分。分區(qū)大小是由塊大小和“openCostInBytes”額外開銷的總和增加的。

      3.如果塊大小的增加超過了當(dāng)前分區(qū)被打包的大小超過了' maxSplitBytes ',那么當(dāng)前分區(qū)被聲明為完整并啟動(dòng)一個(gè)新分區(qū)。迭代的文件塊成為正在初始化的新分區(qū)的一部分,而新分區(qū)大小成為塊大小和‘openCostInBytes’額外開銷的總和。

      打包過程結(jié)束后,將獲得用于讀取相應(yīng)數(shù)據(jù)文件的數(shù)據(jù)集的分區(qū)數(shù)。

       

       

       

       

       

      盡管獲得分區(qū)數(shù)量的過程似乎有點(diǎn)復(fù)雜,但基本的思想是,如果文件是可分拆的,那么首先在maxSplitBytes邊界處拆分單個(gè)文件。

      在此之后,將文件的分割塊或不可分割的文件打包到一個(gè)分區(qū)中,這樣,在將塊打包到一個(gè)分區(qū)中期間,

      如果分區(qū)大小超過maxSplitBytes,則認(rèn)為該分區(qū)已經(jīng)打包完成,然后采用一個(gè)新分區(qū)進(jìn)行打包。因此,最終從包裝過程中得到一定數(shù)量的分區(qū)。

      e.g:

      core設(shè)置為10

      (a) 54 parquet files, 65 MB each, 默認(rèn)參數(shù) 。  

      bytesPerCore = (54*65 + 54 * 4)/ 10 = 372M
      
      maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,372M)=128
      
      65 < 128 && 2*65 > 128 ==> 54分區(qū)

       

      (b)54 parquet files, 63 MB each, 默認(rèn)參數(shù)。

      bytesPerCore = (54*63 + 54 * 4)/ 10 = 361M
      
      maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,361M)=128
      
      63 < 128 &&   4 + 2*63=126+4=130 > 128=maxPartitionBytes  ==> 54 (看起來 1分區(qū)可以容納2個(gè)塊,但是存在一個(gè)openCostInBytes開銷4M,2個(gè)63+4大于了 128M,故一個(gè)分區(qū)只能一個(gè)塊)

       

      (c)54 parquet files, 40 MB each, 默認(rèn)參數(shù)。

      bytesPerCore = (54*40 + 54 * 4)/ 10 = 237M
      
      maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,237M)=128
      
      40 < 128 && (4+3*  40) = 124 < 128 (故一個(gè)分區(qū)可以裝3個(gè)塊) = 54/3 = 18分區(qū)

       

      (d)54 parquet files, 40 MB each, maxPartitionBytes=88M 其余默認(rèn)

      bytesPerCore = (54*40 + 54 * 4)/ 10 = 237M
      
      maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(88M,237M)=88
      
      40 < 88 && (4+2*40) = 84 < 88 (一個(gè)分區(qū)2個(gè)) = 27個(gè)分區(qū)

       

      (e) 54 parquet files, 40 MB each ; spark.default.parallelism set to 400

      bytesPerCore = (54*40 + 54 * 4)/ 400 = 5.94M
      
      maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,5.94M)=5.94
      
      每個(gè)文件塊數(shù):ceil(40 / 5.94) = 7個(gè)   5.94 + 4M  > 5.94  一個(gè)分區(qū)一個(gè)塊
      
      所以總分區(qū)數(shù)為:  54 * 7 = 378 個(gè)分區(qū) 

       

       

       


       使用RDD APIs讀取數(shù)據(jù)文件的分區(qū)數(shù)

       

      RDD APIs類似下面的API

      *SparkContext.newAPIHadoopFile(String path, Class<F> fClass, Class<K> kClass, Class<V> vClass, org.apache.hadoop.conf.Configuration conf)
      *SparkContext.textFile(String path, int minPartitions)
      *SparkContext.sequenceFile(String path, Class<K> keyClass, Class<V> valueClass)
      *SparkContext.sequenceFile(String path, Class<K> keyClass, Class<V> valueClass, int minPartitions)
      *SparkContext.objectFile(String path, int minPartitions, scala.reflect.ClassTag<T> evidence$4)

      在這些API中,會(huì)詢問參數(shù)' minPartitions ',而在另一些API中則沒有。如果沒有查詢,則默認(rèn)值為2或1,1(默認(rèn)情況下為1)。并行性是1。這個(gè)“minPartitions”是決定這些api返回的RDD中分區(qū)數(shù)量的因素之一。其他因素為Hadoop配置參數(shù)的值:

      # 關(guān)于 mapred.min.split.size see  https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/DeprecatedProperties.html

      # 關(guān)于 dfs.blocksize  see https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml 默認(rèn): 128M

      minSize (mapred.min.split.size - default value 1 MB) or  minSize (mapreduce.input.fileinputformat.split.minsize - default value 1 MB) 

      blockSize (dfs.blocksize - default 128 MB)

       

      goalSize = Sum of all files lengths to be read / minPartitions

      splitSize = Math.max(minSize, Math.min(goalSize, blockSize));
       

      現(xiàn)在使用“splitSize”,

      for each_file in files:
          if each_file:
              if each_file.size() > splitSize:
                  # file 被切分為 block_number 塊其中block_number-1大小為 splitSize,最后一個(gè)塊<=splitSize
                  block_number = ceil(each_file.size() / maxSplitBytes)
              else:
                  # 大小等于文件長度的文件塊
                  block_number = 1
                  block_size  = 文件長度的文件塊
          else:
              #文件不可分
              block_number = 1
              block_size = 文件長度的文件塊

      每個(gè)文件塊(大小大于0)都映射到單個(gè)分區(qū)。因此,由數(shù)據(jù)文件上的RDD api返回的RDD中的分區(qū)數(shù),等于使用“splitSize”對數(shù)據(jù)文件進(jìn)行切片而得到的非零文件塊的數(shù)

       

       

       

      e.g:

      (a). 31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions not specified, core is 10

      splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize))  = max( 0 , 128M) = 128M 
      
      一個(gè)文件按照splitSize=128M可以分3個(gè),故一共分區(qū)數(shù) 31*3=93

       

      (b). 54 parquet files, 40 MB each, blocksize at default 128 MB,  core is 10

      splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize))  = max( 0 , min(2264924160/1,128M)) = 128M 
      
      splitSize=128M ,40 <128 1個(gè)文件長度的文件塊 故為54個(gè)分區(qū)

       

      (c) 31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions specified as 1000

      splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize))  = max( 0 , min(31 * 330 * 1024 * 1024/1000 ,128 * 1024 * 1024)) = 10726932 = 10.23M
      
       一個(gè)文件分為 ceil(330/10.23) = 33塊  共計(jì):31 * 33 = 1023 共計(jì)分區(qū): 1023個(gè)

       

      (d)  31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions not specified, ‘mapred.min.split.size’ set at 256 MB, No. of core equal to 10

      splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize))  = max( 256 , min(31 * 330 * 1024 * 1024/1 ,128 * 1024 * 1024)) =256M
      
      330/256.0 = 2 , 31 * 2 = 62個(gè)分區(qū)

       

       


       

      總結(jié):

           分區(qū)的最佳數(shù)量是高效可靠的Spark應(yīng)用程序的關(guān)鍵。

      posted @ 2020-06-18 12:04  similarface  閱讀(2386)  評(píng)論(0)    收藏  舉報(bào)
      主站蜘蛛池模板: 国产香蕉九九久久精品免费| 国产精品成人自产拍在线| 人妻日韩精品中文字幕| 成人免费毛片aaaaaa片| 亚洲第一无码AV无码专区| 色吊丝免费av一区二区| 欧美高清精品一区二区| 2021亚洲国产精品无码| 手机看片福利一区二区三区 | 亚洲男女羞羞无遮挡久久丫| 国产一区二区三区不卡视频| 久久青青草原精品国产app| 欧美做受视频播放| 亚洲国产成人无码影院| 综艺| 午夜精品一区二区三区成人| 日本区二区三区不卡视频| 欧美人与动zozo在线播放| 国产乱弄免费视频观看| 国内综合精品午夜久久资源| 精品国产一区二区三区大| 二连浩特市| 又湿又紧又大又爽A视频男| 加勒比无码人妻东京热| 精品国产一区二区三区四区阿崩 | 亂倫近親相姦中文字幕| 国产成人高清亚洲综合| 国产中文字幕精品免费| 亚洲精品久久久久国产| 中文字幕久久人妻熟人妻| 亚洲午夜无码久久久久蜜臀av| 免费观看日本污污ww网站 | 99欧美日本一区二区留学生| 神农架林区| 少妇午夜福利一区二区三区| 最新亚洲av日韩av二区| 99视频偷窥在线精品国自产拍| 国产精品一区二区传媒蜜臀| 奇米四色7777中文字幕| 精品无码久久久久久久动漫| 精品人妻中文字幕有码在线|