Spark Programming--- Shuffle operations
http://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operations
一些spark的特定操作將會觸發被稱為shuffle的事件。Shuffle是Spark用于重新分布數據的機制,這樣可以在不同的分區來分組。這通常涉及到在executor和機器之間進行拷貝數據,所以shuffle是一個很復雜并且消耗高的操作。
背景
為了了解shuffle期間發生了什么,我們可以考慮reduceByKey操作作為例子。reduceByKey操作生成了一個新的RDD通過所有的單個鍵值組合為一個元組-關鍵字和針對與該關鍵字相關的所有值執行reduce函數的結果。這里的挑戰不是所有的值對一個單獨的鍵都在同一個分區上或者甚至說在一臺機器上,而是它們必須被重新分布來計算結果。
在Spark,數據通常不會跨分區分布到特定操作的必要位置。在計算中,一個單獨的任務將會在一個單獨的分區上操作-然而為了組織所有的數據來被一個的單獨reduceByKey 的reduce任務來執行,Spark需要來執行一個all-to-all操作。它必須讀取所有分區來找到所有鍵的值,然后將它們帶到一起跨分區來為每一個鍵計算最終的結果---這個被稱為shuffle。
盡管在每一個分區中的新的shuffled數據的元素集是很重要的,同樣分區自己的順序也很重要,而元素之間的順序就不是了。如果一個想要預測shuffle中的順序數據那么可以使用:
- mapPartitions 來排序每一個分區,比如,.sorted
- repartitionAndSortWithinPartitions 來有效分區同時同步重新分區。
- sortBy 創造一個全局的排序的RDD
可以引起一個shuffle 的操作包括:repartition 和 coalesce,ByKey的操作,除了counting之外的比如:groupByKey 和reduceByKey,以及join操作比如cogroup 和 join。
性能影響
Shuffle是一個昂貴的操作因為它涉及到磁盤I/O,數據序列化和網絡I/O。為了給shuffle組織數據,spark生成一系列任務-maps用于組織數據,以及一系列reduce任務來聚集它。這個命名系統來自于MapReduce而且并不直接和SparK的map,reduce操作有關。
在內部,單獨的map任務的結果會被保存在內存中直到它們不適用。然后這些結果會被根據目標分區排序并且寫向單一的文件。在reduce方面,任務讀取相關的排序塊。
一定的shuffle操作會消耗明顯的數量的堆內存因為它們使用的是在內存中的數據結構來組織記錄在傳輸之前或者之后。明顯的,reduceByKey和AggregateByKey創造了這些結構在map階段,以及 ‘Bykey的操作生成了它們在reduce階段。當數據不能放進內存中時,Spark將會將這些表散落到硬盤中,會引起而外的磁盤I/O和增加垃圾回收次數。
Shuffle同樣會生成大量的中間文件在磁盤中。從Spark1.3開始,這些文件被保存直到對應的RDDs不再被使用以及已經被垃圾回收了。這樣做是為了shuffle文件不需要被重新創造如果lineage被重新計算時。垃圾回收也許會發生只有在一段很長時間,如果這個應用保留了對RDD的引用或者如果GC沒有頻繁的發生。這意味著長期運行的spark任務也許會消耗大量的磁盤空間。這個零時的磁盤目錄會被spark.local.dir參數所指定。
Shuffle行為可以被調整通過一系列的參數。可以參考 Spark Configuration Guide.‘Shuffle Behavior’章節。
Shuffle Behavior
| 屬性名稱Property Name | 默認值Default | 含義Meaning |
|---|---|---|
| spark.reducer.maxSizeInFlight | 48m | 從每一個reduce任務中同步獲取的map輸出的最大值。由于每一個輸出需要我們創造一個緩存來接受它,這個代表了每個任務的固定的內存開銷,所以盡量保證它較小除非你有很多內存。 |
| spark.reducer.maxReqsInFlight | Int.MaxValue | 這個配置限制了任意給定點遠程請求獲取塊數。當集群中的主機數量增加的時候,它也許會導致一個非常大數量的內部連接到一到多個節點,引起worker在負載下失敗。通過允許它來限制獲取請求的數量,這個情況也許會緩解 |
| spark.reducer.maxBlocksInFlightPerAddress | Int.MaxValue | 這個配置限制了每一個從給定端口里的的reduce任務可以獲取的遠程端口數量。當一個大量的block被一個給定的地址在一次單獨獲取或者同步獲取所請求時,可能會沖垮服務的executor或者Node Manager。這個配置對于減少Node Manager的負載尤為有用當外部的shuffle是被允許的。你可以通過設定一個較低值來減輕這個情況。 |
| spark.maxRemoteBlockSizeFetchToMem | Long.MaxValue | 遠程的塊將會被獲取到磁盤中,當這個塊的大小超過了這個配置的值在byte單位上。這個用于避免一個巨大的請求占據了太多的內存。我們可以將這個配置為一個指定的值(比如,200M)。注意到這個配置將會影響到shuffle的獲取以及遠程塊獲取的塊管理。對于允許了外部shuffle服務的用戶,這個特性只會在外部shuffle服務版本高于Spark2。2時有效。 |
| spark.shuffle.compress | true | 是否壓縮map的輸出文件,通常是一個好想法。壓縮將會使用spark.io.compression.codec. |
| spark.shuffle.file.buffer | 32k | 對每一個shuffle文件輸出流的在內存中的緩存大小,單位是KiB除非有其他的特別指定。這些緩存減少了硬盤查找和系統調用創建中間shuffle文件的過程。 |
| spark.shuffle.io.maxRetries | 3 | (Netty only)最大自動重復嘗試的次數如果這個值沒有被設置為0.這個重試邏輯有助于穩定大型的shuffle在長時間的GC暫停或者暫時的網絡連接問題上。 |
| spark.shuffle.io.numConnectionsPerPeer | 1 | (Netty only) 節點之間的連接的重復使用為了減少大型集群中重復建立連接的情況。對于有很多硬盤和很少主機的集群,這個將會導致并發行不足以飽和所有硬盤,因此用戶可能會考慮增加這個值。 |
| spark.shuffle.io.preferDirectBufs | true | (Netty only) 堆外緩沖區在shuffle和緩存塊轉移期間被用于減少垃圾回收。對于對外緩存內存數量有限的環境,用戶也許想要關掉這個來強迫所有的來自于Netty的分配都是在堆上。 |
| spark.shuffle.io.retryWait | 5s | (Netty only) 在每一次重試直接需要等待多久。最大的延遲時間默認是15秒,maxRetries * retryWait. |
| spark.shuffle.service.enabled | false | 允許外部shuffle服務。這個服務保存了通過executor所寫的shuffle文件,這樣這個executor可以安全的被移除。這個配置必須被允許如果spark.dynamicAllocation.enabled是“true”。這個外部的shuffle服務必須被啟動。查看dynamic allocation configuration and setup documentation 來獲得更多信息。 |
| spark.shuffle.service.port | 7337 | 外部shuffle服務將會運行的端口。 |
| spark.shuffle.service.index.cache.size | 100m | 緩存條目限制在指定的內存占用空間中,以字節為單位 |
| spark.shuffle.maxChunksBeingTransferred | Long.MAX_VALUE | 在shuffle服務中同一時間最大允許傳輸的塊數量。注意到新來的連接將會被關閉如果達到了最大數量。這個客戶端將會嘗試重新連接根據shuffle的重試配置(see spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait),如果這個限制也被達到了,那么這個任務將會失敗。 |
| spark.shuffle.sort.bypassMergeThreshold | 200 | (Advanced)在基于排序的shuffle管理中,避免合并排序數據如果這里沒有map-side的聚合和這里最多有配置的這么多的reduce分區。 |
| spark.shuffle.spill.compress | true | 是否壓縮溢出的數據在shuffle期間 |
| spark.shuffle.accurateBlockThreshold | 100 * 1024 * 1024 | 閥值是以bytes為單位,高于此值將準確記錄HighlyCompressedMapStatus中的shuffle塊的大小。這個用于幫助阻止OOM通過避免錯誤估計了shuffle塊大小當獲取了shuffle塊時。 |
| spark.shuffle.registration.timeout | 5000 | 注冊外部shuffle服務的超時時間,單位是毫秒 |
| spark.shuffle.registration.maxAttempts | 3 | 當注冊外部shuffle服務失敗的時候,我們會重復嘗試的最大次數 |
| spark.io.encryption.enabled | false | 允許IO編碼。目前支持所有的模式除了Mesos。當使用這個特性的時候,我們推薦RPC編碼。 |
| spark.io.encryption.keySizeBits | 128 | IO編碼的值大小單位為bit。支持的值有128,192和256. |
| spark.io.encryption.keygen.algorithm | HmacSHA1 | 當生成一個IO編碼鍵值時使用的算法。被支持的算法在Java Cryptography Architecture Standard Algorithm Name 文檔的KeyGenerator章節中被描述。 |
浙公網安備 33010602011771號