Spark SQL五大關(guān)聯(lián)策略
1、五種連接策略
選擇連接策略的核心原則是盡量避免shuffle和sort的操作,因?yàn)檫@些操作性能開(kāi)銷(xiāo)很大,比較吃資源且耗時(shí),所以首選的連接策略是不需要shuffle和sort的hash連接策略。
?Broadcast Hash Join(BHJ):廣播散列連接
?Shuffle Hash Join(SHJ):洗牌散列連接
?Shuffle Sort Merge Join(SMJ):洗牌排列合并聯(lián)系
?Cartesian Product Join(CPJ):笛卡爾積連接
?Broadcast Nested Loop Join(BNLJ):廣播嵌套循環(huán)連接
2、連接影響因素
2.1、連接類(lèi)型是否為equi-join(等值連接)
等值連接是指一個(gè)連接條件中只包含“=”比較的連接,而非等值連接包含除“=”以外的任何比較,如“<、>、>=、<=”,由于非等值連接是對(duì)不確定值的范圍比較,需要嵌套循環(huán),所以只有CPJ和BMLJ兩種連接策略支持非等值連接,對(duì)于等值連接,所有連接策略都支持。
2.2、連接策略提示(Join strategy hint)
Spark SQL為開(kāi)發(fā)人員提供了通過(guò)連接提示對(duì)連接策略選擇進(jìn)行一些控制,共支持4種連接提示(Spark3.0.0版本)。
?BROADCAST
?SHUFFLE_MERGE
?SHUFFLE_HASH
?SHUFFLE_REPLICATE_NL
使用示例:SELECT
/*+ BROADCAST(table_B) */ *
FROM
table_A
INNER JOIN
table_B
ON
table_A.id = table_B.id
2.3、連接數(shù)據(jù)集的大小(Size of Join relations)
選擇連接策略最重要的因素是連接數(shù)據(jù)集的大小,是否可以選擇不需要shuffle和sort的基于hash的連接策略,就取決于連接中涉及的數(shù)據(jù)集的大小。
3、連接策略?xún)?yōu)先級(jí)

4、五種連接策略運(yùn)行原理
4.1、Broadcast Hash Join(BHJ):廣播散列連接
?主要分為兩個(gè)階段:
1.廣播階段:通過(guò)collect算子將小表數(shù)據(jù)拉到Driver端,再把整體的小表廣播致每個(gè)Executor端一份。
2.關(guān)聯(lián)階段:在每個(gè)Executor上進(jìn)行hash join,為較小的表通過(guò)join key創(chuàng)建hashedRelation作為build table,循環(huán)大表stream table通過(guò)join key關(guān)聯(lián)build table。
?限制條件:
1.被廣播的小表大小必須小于參數(shù):spark.sql.autoBroadcaseJoinThreshold,默認(rèn)為10M。
2.基表不能被廣播,比如left join時(shí),只能廣播右表。
3.數(shù)據(jù)集的總行數(shù)小于MAX_BROADCAST_TABLE_ROWS閾值,閾值被設(shè)置為3.41億行。

4.2、Shuffle Hash Join(SHJ):洗牌散列連接
?主要分為兩個(gè)階段:
1.洗牌階段:通過(guò)對(duì)兩張表分別按照join key分區(qū)洗牌,為了讓相同join key的數(shù)據(jù)分配到同一Executor中。
2.關(guān)聯(lián)階段:在每個(gè)Executor上進(jìn)行hash join,為較小的表通過(guò)join key創(chuàng)建hashedRelation作為build table,循環(huán)大表stream table通過(guò)join key關(guān)聯(lián)build table。
?限制條件:
1.小表大小必須小于參數(shù):spark.sql.autoBroadcaseJoinThreshold(默認(rèn)為10M) * shuffle分區(qū)數(shù)。
2.基表不能被廣播,比如left join時(shí),只能廣播右表。
3.較小表至少比較大表小3倍以上,否則性能收益未必大于Shuffle Sort Merge Join。

4.3、Shuffle Sort Merge Join(SMJ):洗牌排列合并聯(lián)系
?主要分為兩個(gè)階段:
1.洗牌階段:將兩張大表分別按照join key分區(qū)洗牌,為了讓相同join key的數(shù)據(jù)分配到同一分區(qū)中。
2.排序階段:對(duì)單個(gè)分區(qū)的兩張表分別進(jìn)行升序排序。
3.關(guān)聯(lián)階段:兩張有序表都可以作為stream table或build table,順序迭代stream table行,在build table順序逐行搜索,相同鍵關(guān)聯(lián),由于stream table或build table都是按連接鍵排序的,當(dāng)連接過(guò)程轉(zhuǎn)移到下一個(gè)stream table行時(shí),在build table中不必從第一個(gè)行搜索,只需從與最后一個(gè)stream table匹配行繼續(xù)搜索即可。
?限制條件:
1.連接鍵必須是可排序的。

4.4、Cartesian Product Join(CPJ):笛卡爾積連接
?主要分為兩個(gè)階段:
1.分區(qū)階段:將兩張大表分別進(jìn)行分片,再將兩個(gè)父分片a,b進(jìn)行笛卡爾積組裝子分片,子分片數(shù)量:a*b。
2.關(guān)聯(lián)階段:會(huì)對(duì)stream table和build table兩個(gè)表使用內(nèi)、外兩個(gè)嵌套的for循環(huán)依次掃描,通過(guò)關(guān)聯(lián)鍵進(jìn)行關(guān)聯(lián)。
?限制條件:
1.left join廣播右表,right join廣播左表,inner join廣播兩張表。

4.5、Broadcast Nested Loop Join(BNLJ):廣播嵌套循環(huán)連接
?主要分為兩個(gè)階段:
1.廣播階段:通過(guò)collect算子將小表數(shù)據(jù)拉到Driver端,再把整體的小表廣播致每個(gè)Executor端一份。
2.關(guān)聯(lián)階段:會(huì)對(duì)stream table和build table兩個(gè)表使用內(nèi)、外兩個(gè)嵌套的for循環(huán)依次掃描,通過(guò)關(guān)聯(lián)鍵進(jìn)行關(guān)聯(lián)。
?限制條件:
1.僅支持內(nèi)連接。
2.開(kāi)啟參數(shù):spark.sql.crossJoin.enabled=true。

作者:曲海龍
來(lái)源:京東云開(kāi)發(fā)者社區(qū) 轉(zhuǎn)載請(qǐng)注明來(lái)源
浙公網(wǎng)安備 33010602011771號(hào)