spark-sql優(yōu)化簡述
本文分享自天翼云開發(fā)者社區(qū)《spark-sql優(yōu)化簡述》,作者:徐****東
1、自適應(yīng)中reduce參數(shù)控制
spark.sql.adaptive.shuffle.targetPostShuffleInputSize用于控制任務(wù)Shuffle后的目標輸入大小(以字節(jié)為單位)。
spark.sql.adaptive.minNumPostShufflePartitions用于控制自適應(yīng)執(zhí)行中使用的shuffle后最小的分區(qū)數(shù),可用于控制最小并行度。
spark.sql.adaptive.maxNumPostShufflePartitions來控制Shuffle后分區(qū)的最大數(shù)量。
2、合理設(shè)置單partition讀取數(shù)據(jù)量
SET spark.sql.files.maxPartitionBytes=xxxx;
3、合理設(shè)置shuffle partition的數(shù)量
SET spark.sql.shuffle.partitions=xxxx
4、使用coalesce & repartition調(diào)整partition數(shù)量
SELECT /*+ COALESCE(3) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(3) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(c) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION(3, dept_col) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION_BY_RANGE(dept_col) */ * FROM EMP_TABLE
SELECT /*+ REPARTITION_BY_RANGE(3, dept_col) */ * FROM EMP_TABLE
5、使用broadcast join
6、開啟Adaptive Query Execution(Spark 3.0)
6.1、動態(tài)合并分區(qū): spark會根據(jù)分區(qū)的數(shù)據(jù)量將小數(shù)據(jù)量的多個分區(qū)合并成一個分區(qū),可以提高資源的利用率
spark.sql.adaptive.enabled: 是否開啟AQE優(yōu)化
spark.sql.adaptive.coalescePartitions.enabled: 是否開啟動態(tài)合并分區(qū)
spark.sql.adaptive.coalescePartitions.initialPartitionNum: 初始分區(qū)數(shù)
spark.sql.adaptive.advisoryPartitionSizeInBytes 合并分區(qū)的推薦目標大小
spark.sql.adaptive.coalescePartitions.minPartitionNum: 合并之后的最小分區(qū)數(shù)
當RDD的分區(qū)數(shù)處于spark.sql.adaptive.coalescePartitions.initialPartitionNum與spark.sql.adaptive.coalescePartitions.minPartitionNum范圍內(nèi)才會合并
spark.sql.adaptive.advisoryPartitionSizeInBytes: 合并分區(qū)之后,分區(qū)的數(shù)據(jù)量的預(yù)期大小
6.2、動態(tài)切換join策略: 在join的時候,會動態(tài)選擇性能最高的join策略,提高效率
spark.sql.adaptive.enabled: 是否開啟AQE優(yōu)化
spark.sql.adaptive.localShuffleReader.enabled:在不需要進行shuffle重分區(qū)時,嘗試使用本地shuffle讀取器。將sort-meger join 轉(zhuǎn)換為廣播join
6.3、動態(tài)申請資源: 當計算過程中資源不足會自動申請資源
spark.sql.adaptive.enabled: 是否開啟AQE優(yōu)化
spark.dynamicAllocation.enabled: 是否開啟動態(tài)資源申請
spark.dynamicAllocation.shuffleTracking.enabled: 是否開啟shuffle狀態(tài)跟蹤
6.4、動態(tài)join數(shù)據(jù)傾斜: join的時候如果出現(xiàn)了數(shù)據(jù)傾斜,會動態(tài)調(diào)整分區(qū)的數(shù)據(jù)量,優(yōu)化數(shù)據(jù)傾斜導(dǎo)致的性能問題。
spark.sql.adaptive.enabled: 是否開啟AQE優(yōu)化
傾斜的膨脹系數(shù):spark.sql.adaptive.skewJoin.skewedPartitionFactor:N
傾斜的最低閾值:spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes:M
拆分粒度,以字節(jié)為單位:spark.sql.adaptive.advisoryPartitionSizeInBytes
G [代表優(yōu)化之后,分區(qū)數(shù)數(shù)據(jù)的預(yù)期大小]
sparksql判斷出現(xiàn)數(shù)據(jù)傾斜的依據(jù)[需要兩個條件同時滿足]:
當某個分區(qū)處理的數(shù)據(jù)量>= N * 所有task處理數(shù)據(jù)量的中位數(shù)
當某個分區(qū)處理的數(shù)據(jù)量>= M
7、文件與分區(qū)
SET spark.sql.files.maxPartitionBytes=xxx //讀取文件的時候一個分區(qū)接受多少數(shù)據(jù);
spark.sql.files.openCostInBytes//文件打開的開銷,通俗理解就是小文件合并的閾值
8、CBO優(yōu)化
spark.sql.cbo.enabled: 是否開啟cbo優(yōu)化
spark.sql.cbo.joinReorder.enabled: 是否調(diào)整多表Join的順序
spark.sql.cbo.joinReorder.dp.threshold: 設(shè)置多表jion的表數(shù)量的閾值,一旦join的表數(shù)量超過該閾值則不優(yōu)化多表join的順序
9、hints優(yōu)化
hints預(yù)防主要用在分區(qū)和join上。
Partitioning Hints Types:COALESCE,REPARTITION,REPARTITION_BY_RANGE
Join Hints Types:BROADCAST,MERGE,SHUFFLE_HASH,SHUFFLE_REPLICATE_NL
SELECT /*+ COALESCE(3) */ * FROM t;
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REPARTITION(3, c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;
## Join Hints for broadcast join
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ BROADCASTJOIN (t1) */ * FROM t1 left JOIN t2 ON t1.key = t2.key;
SELECT /*+ MAPJOIN(t2) */ * FROM t1 right JOIN t2 ON t1.key = t2.key;
-- Join Hints for shuffle sort merge join
SELECT /*+ SHUFFLE_MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGEJOIN(t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
SELECT /*+ MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
## Join Hints for shuffle hash join
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
## Join Hints for shuffle-and-replicate nested loop join
SELECT /*+ SHUFFLE_REPLICATE_NL(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
## When different join strategy hints are specified on both sides of a join, Spark
## prioritizes the BROADCAST hint over the MERGE hint over the SHUFFLE_HASH hint
## over the SHUFFLE_REPLICATE_NL hint.
## Spark will issue Warning in the following example
## org.apache.spark.sql.catalyst.analysis.HintErrorLogger: Hint (strategy=merge)
## is overridden by another hint and will not take effect.
SELECT /*+ BROADCAST(t1), MERGE(t1, t2) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
10、緩存表
對于一條SQL語句中可能多次使用到的表,可以對其進行緩存,使用SQLContext.cacheTable(TableName)或者DataFrame.cache即可,SparkSQL會用內(nèi)存列存儲的格式進行表的緩存,然后SparkSQL就可以僅僅掃描需要使用的列,并且自動優(yōu)化壓縮,來最小化內(nèi)存的使用和GC的開銷,SQLContext.uncacheTable(tableName)可以將表從緩存中移除,使用SQLContext.setConf()設(shè)置,可以通過
spark.sql.inMemoryColumnarStorage.batchSize
這個參數(shù),默認10000,配置列存儲單位。
永久視圖 view:永久保存一段查詢語句的邏輯,而不是查詢語句的數(shù)據(jù),永久有效,查詢這個視圖,相當于查詢一個SQL語句,如果保存的查詢邏輯復(fù)雜,這查詢視圖也耗時長。支持重新覆蓋
create or replace view view1 as
臨時視圖 temporary view:只在當前會話生效,如果會話結(jié)束,則臨時視圖失效,支持重新覆蓋 create or replace temporary view temp_view1 as,類似于 SparkSQL 中的
DataFrame.createOrReplaceTempView('視圖名'),hive不支持這個語法
緩存表cache table:只在當前會話有效,將一段查詢結(jié)果集緩存到內(nèi)存,并賦予一個表名。
table:永久有效,保存數(shù)據(jù)結(jié)構(gòu)和數(shù)據(jù)本身到磁盤。
with as:當子查詢的嵌套層數(shù)太多時,可以用with as 增加可讀性。
11、group by優(yōu)化
為了提高 group by 查詢的性能,可以嘗試以下幾種方法:
僅選擇必要的字段進行 group by 操作,避免選擇過多的字段。
盡可能將 group by 字段類型保持一致,以減少數(shù)據(jù)轉(zhuǎn)換的開銷。
如果可能,可以將 group by 字段進行哈希分區(qū),以減少數(shù)據(jù)傳輸和處理的開銷。
如果使用的是字符串類型,可以考慮使用哈希函數(shù)來減少字符串比較的開銷。
12、優(yōu)化傾斜連接
數(shù)據(jù)偏斜會嚴重降低聯(lián)接查詢的性能。此功能通過將傾斜的任務(wù)拆分(按需復(fù)制)為大小大致相等的任務(wù)來動態(tài)處理排序合并聯(lián)接中的傾斜。同時啟用spark.sql.adaptive.enabled和spark.sql.adaptive.skewJoin.enabled配置時,此選項才生效。

浙公網(wǎng)安備 33010602011771號