spark數(shù)據(jù)分區(qū)數(shù)量的原理
原始RDD或數(shù)據(jù)集中的每一個(gè)分區(qū)都映射一個(gè)或多個(gè)數(shù)據(jù)文件, 該映射是在文件的一部分或者整個(gè)文件上完成的。
Spark Job RDD/datasets在執(zhí)行管道中,通過根據(jù)分區(qū)到數(shù)據(jù)文件的映射讀取數(shù)據(jù)輸入到RDD/dataset。
如何根據(jù)某些參數(shù)確定spark的分區(qū)數(shù)?
使用Dataset APIs讀取數(shù)據(jù)的分區(qū)數(shù):
functions:
https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/DataFrameReader.html
*文件格式 APIs* Dataset<Row> = SparkSession.read.csv(...) Dataset<Row> = SparkSession.read.json(...) Dataset<Row> = SparkSession.read.text(...) Dataset<Row> = SparkSession.read.parquet(...) Dataset<Row> = SparkSession.read.orc(...) *通用格式 API* Dataset<Row> = SparkSession.read.format(String fileformat).load(...)
影響數(shù)據(jù)分區(qū)數(shù)的參數(shù):
(a)spark.default.parallelism (default: Total No. of CPU cores)
(b)spark.sql.files.maxPartitionBytes (default: 128 MB) 【讀取文件時(shí)打包到單個(gè)分區(qū)中的最大字節(jié)數(shù)?!?br>(c)spark.sql.files.openCostInBytes (default: 4 MB) 【 該參數(shù)默認(rèn)4M,表示小于4M的小文件會(huì)合并到一個(gè)分區(qū)中,用于減小小文件,防止太多單個(gè)小文件占一個(gè)分區(qū)情況。這個(gè)參數(shù)就是合并小文件的閾值,小于這個(gè)閾值的文件將會(huì)合并?!?/p>
使用這些配置參數(shù)值,一個(gè)名為maxSplitBytes的最大分割準(zhǔn)則被計(jì)算如下:
maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore)
bytesPerCore = (文件總大小 + 文件個(gè)數(shù) * openCostInBytes)/ default.parallelism
maxSplitBytes:
for each_file in files: if each_file is can split: if each_file.size() > maxSplitBytes: # file 被切分為 block_number 塊其中block_number-1大小為 maxSplitBytes,1塊<=maxSplitBytes block_number = ceil(each_file.size() / maxSplitBytes) else: block_number = 1 else: #文件不可分 block_number = 1
數(shù)據(jù)文件計(jì)算文件塊之后,將一個(gè)或多個(gè)文件塊打包到一個(gè)分區(qū)中。
打包過程從初始化一個(gè)空分區(qū)開始,然后對每個(gè)文件塊進(jìn)行迭代:
1. 如果沒有當(dāng)前分區(qū)要打包,請初始化要打包的新分區(qū),然后將迭代的文件塊分配給該分區(qū)。 分區(qū)大小成為塊大小與“ openCostInBytes”的額外開銷的總和。
2.如果塊大小的增加不超過當(dāng)前分區(qū)(正在打包)的大小超過' maxSplitBytes ',那么文件塊將成為當(dāng)前分區(qū)的一部分。分區(qū)大小是由塊大小和“openCostInBytes”額外開銷的總和增加的。
3.如果塊大小的增加超過了當(dāng)前分區(qū)被打包的大小超過了' maxSplitBytes ',那么當(dāng)前分區(qū)被聲明為完整并啟動(dòng)一個(gè)新分區(qū)。迭代的文件塊成為正在初始化的新分區(qū)的一部分,而新分區(qū)大小成為塊大小和‘openCostInBytes’額外開銷的總和。
打包過程結(jié)束后,將獲得用于讀取相應(yīng)數(shù)據(jù)文件的數(shù)據(jù)集的分區(qū)數(shù)。

盡管獲得分區(qū)數(shù)量的過程似乎有點(diǎn)復(fù)雜,但基本的思想是,如果文件是可分拆的,那么首先在maxSplitBytes邊界處拆分單個(gè)文件。
在此之后,將文件的分割塊或不可分割的文件打包到一個(gè)分區(qū)中,這樣,在將塊打包到一個(gè)分區(qū)中期間,
如果分區(qū)大小超過maxSplitBytes,則認(rèn)為該分區(qū)已經(jīng)打包完成,然后采用一個(gè)新分區(qū)進(jìn)行打包。因此,最終從包裝過程中得到一定數(shù)量的分區(qū)。
e.g:
core設(shè)置為10
(a) 54 parquet files, 65 MB each, 默認(rèn)參數(shù) 。
bytesPerCore = (54*65 + 54 * 4)/ 10 = 372M
maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,372M)=128
65 < 128 && 2*65 > 128 ==> 54分區(qū)
(b)54 parquet files, 63 MB each, 默認(rèn)參數(shù)。
bytesPerCore = (54*63 + 54 * 4)/ 10 = 361M
maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,361M)=128
63 < 128 && 4 + 2*63=126+4=130 > 128=maxPartitionBytes ==> 54 (看起來 1分區(qū)可以容納2個(gè)塊,但是存在一個(gè)openCostInBytes開銷4M,2個(gè)63+4大于了 128M,故一個(gè)分區(qū)只能一個(gè)塊)
(c)54 parquet files, 40 MB each, 默認(rèn)參數(shù)。
bytesPerCore = (54*40 + 54 * 4)/ 10 = 237M
maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,237M)=128
40 < 128 && (4+3* 40) = 124 < 128 (故一個(gè)分區(qū)可以裝3個(gè)塊) = 54/3 = 18分區(qū)
(d)54 parquet files, 40 MB each, maxPartitionBytes=88M 其余默認(rèn)
bytesPerCore = (54*40 + 54 * 4)/ 10 = 237M
maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(88M,237M)=88
40 < 88 && (4+2*40) = 84 < 88 (一個(gè)分區(qū)2個(gè)) = 27個(gè)分區(qū)
(e) 54 parquet files, 40 MB each ; spark.default.parallelism set to 400
bytesPerCore = (54*40 + 54 * 4)/ 400 = 5.94M maxSplitBytes = Minimum(maxPartitionBytes, bytesPerCore) = min(128M,5.94M)=5.94 每個(gè)文件塊數(shù):ceil(40 / 5.94) = 7個(gè) 5.94 + 4M > 5.94 一個(gè)分區(qū)一個(gè)塊 所以總分區(qū)數(shù)為: 54 * 7 = 378 個(gè)分區(qū)
使用RDD APIs讀取數(shù)據(jù)文件的分區(qū)數(shù)
RDD APIs類似下面的API
*SparkContext.newAPIHadoopFile(String path, Class<F> fClass, Class<K> kClass, Class<V> vClass, org.apache.hadoop.conf.Configuration conf) *SparkContext.textFile(String path, int minPartitions) *SparkContext.sequenceFile(String path, Class<K> keyClass, Class<V> valueClass) *SparkContext.sequenceFile(String path, Class<K> keyClass, Class<V> valueClass, int minPartitions) *SparkContext.objectFile(String path, int minPartitions, scala.reflect.ClassTag<T> evidence$4)
在這些API中,會(huì)詢問參數(shù)' minPartitions ',而在另一些API中則沒有。如果沒有查詢,則默認(rèn)值為2或1,1(默認(rèn)情況下為1)。并行性是1。這個(gè)“minPartitions”是決定這些api返回的RDD中分區(qū)數(shù)量的因素之一。其他因素為Hadoop配置參數(shù)的值:
# 關(guān)于 mapred.min.split.size see https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
# 關(guān)于 dfs.blocksize see https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/hdfs-default.xml 默認(rèn): 128M
minSize (mapred.min.split.size - default value 1 MB) or minSize (mapreduce.input.fileinputformat.split.minsize - default value 1 MB)
blockSize (dfs.blocksize - default 128 MB)
goalSize = Sum of all files lengths to be read / minPartitions
splitSize = Math.max(minSize, Math.min(goalSize, blockSize));
現(xiàn)在使用“splitSize”,
for each_file in files: if each_file: if each_file.size() > splitSize: # file 被切分為 block_number 塊其中block_number-1大小為 splitSize,最后一個(gè)塊<=splitSize block_number = ceil(each_file.size() / maxSplitBytes) else: # 大小等于文件長度的文件塊 block_number = 1 block_size = 文件長度的文件塊 else: #文件不可分 block_number = 1 block_size = 文件長度的文件塊
每個(gè)文件塊(大小大于0)都映射到單個(gè)分區(qū)。因此,由數(shù)據(jù)文件上的RDD api返回的RDD中的分區(qū)數(shù),等于使用“splitSize”對數(shù)據(jù)文件進(jìn)行切片而得到的非零文件塊的數(shù)

e.g:
(a). 31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions not specified, core is 10
splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize)) = max( 0 , 128M) = 128M
一個(gè)文件按照splitSize=128M可以分3個(gè),故一共分區(qū)數(shù) 31*3=93
(b). 54 parquet files, 40 MB each, blocksize at default 128 MB, core is 10
splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize)) = max( 0 , min(2264924160/1,128M)) = 128M
splitSize=128M ,40 <128 1個(gè)文件長度的文件塊 故為54個(gè)分區(qū)
(c) 31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions specified as 1000
splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize)) = max( 0 , min(31 * 330 * 1024 * 1024/1000 ,128 * 1024 * 1024)) = 10726932 = 10.23M
一個(gè)文件分為 ceil(330/10.23) = 33塊 共計(jì):31 * 33 = 1023 共計(jì)分區(qū): 1023個(gè)
(d) 31 parquet files, 330 MB each, blocksize at default 128 MB, minPartitions not specified, ‘mapred.min.split.size’ set at 256 MB, No. of core equal to 10
splitSize = Math.max(minSize, Math.min(Sum of all files lengths to be read / minPartitions, blockSize)) = max( 256 , min(31 * 330 * 1024 * 1024/1 ,128 * 1024 * 1024)) =256M 330/256.0 = 2 , 31 * 2 = 62個(gè)分區(qū)
總結(jié):
分區(qū)的最佳數(shù)量是高效可靠的Spark應(yīng)用程序的關(guān)鍵。
浙公網(wǎng)安備 33010602011771號(hào)