Spark存儲原理 (核心篇 三)
目錄
- 存儲分析
- Shuffle分析
- 序列化和壓縮
- 共享變量
- 實例
http://www.rzrgm.cn/tgzhu/p/5822370.html
Spark 作為一個以擅長內存計算為優勢的計算引擎,內存管理方案是其非常重要的模塊; Spark的內存可以大體歸為兩類:execution和storage,前者包括shuffles、joins、sorts和aggregations所需內存,后者包括cache和節點間數據傳輸所需內存;在Spark 1.5和之前版本里,兩者是靜態配置的,不支持借用,spark1.6 對內存管理模塊進行了優化,通過內存空間的融合,消除以上限制,提供更好的性能。官方網站只是要求內存在8GB之上即可(Impala推薦要求機器配置在128GB), 但spark job運行效率主要取決于:數據量大小,內存消耗,內核數(確定并發運行的task數量)
目錄:
- 基礎知識
- spark1.5- 內存管理
- spark1.6 內存管理
基本知識:
- on-heap memory:Java中分配的非空對象都是由Java虛擬機的垃圾收集器管理的,也稱為堆內內存。虛擬機會定期對垃圾內存進行回收,在某些特定的時間點,它會進行一次徹底的回收(full gc)。徹底回收時,垃圾收集器會對所有分配的堆內內存進行完整的掃描,這意味著一個重要的事實——這樣一次垃圾收集對Java應用造成的影響,跟堆的大小是成正比的。過大的堆會影響Java應用的性能
- off-heap memory:堆外內存意味著把內存對象分配在Java虛擬機的堆以外的內存,這些內存直接受操作系統管理(而不是虛擬機)。這樣做的結果就是能保持一個較小的堆,以減少垃圾收集對應用的影響
- LRU Cache(Least Recently Used):LRU可以說是一種算法,也可以算是一種原則,用來判斷如何從Cache中清除對象,而LRU就是“近期最少使用”原則,當Cache溢出時,最近最少使用的對象將被從Cache中清除
- spark 源碼: https://github.com/apache/spark/releases
- scale ide for Intellij : http://plugins.jetbrains.com/plugin/?id=1347
Spark1.5- 內存管理:
- 1.6 版本引入了新的內存管理方案,配置參數: spark.memory.useLegacyMode 默認 false 表示使用新方案,true 表示使用舊方案, SparkEnv.scala 源碼 如下圖:
![]()
- 在staticMemoryManager.scala 類中查看構造類及內存獲取定義
![]()
- 通過代碼推斷,若設置了 spark.testing.memory 則以該配置的值作為 systemMaxMemory,否則使用 JVM 最大內存作為 systemMaxMemory。
- spark.testing.memory 僅用于測試,一般不設置,所以這里我們認為 systemMaxMemory 的值就是 executor 的最大可用內存
- Execution:用于緩存shuffle、join、sort和aggregation的臨時數據,通過spark.shuffle.memoryFraction配置
- spark.shuffle.memoryFraction:shuffle 期間占 executor 運行時內存的百分比,用小數表示。在任何時候,用于 shuffle 的內存總 size 不得超過這個限制,超出部分會 spill 到磁盤。如果經常 spill,考慮調大參數值
- spark.shuffle.safetyFraction:為防止 OOM,不能把 systemMaxMemory * spark.shuffle.memoryFraction 全用了,需要有個安全百分比
- 最終用于 execution 的內存量為:executor 最大可用內存* spark.shuffle.memoryFraction*spark.shuffle.safetyFraction,默認為 executor 最大可用內存 * 0.16
- execution內存被分配給JVM里的多個task線程。
- task間的execution內存分配是動態的,如果沒有其他tasks存在,Spark允許一個task占用所有可用execution內存
- storage內存分配分析過程與 Execution 一致,由上面的代碼得出,用于storage 的內存量為: executor 最大可用內存 * spark.storage.memoryFraction * spark.storage.safetyFraction,默認為 executor 最大可用內存 * 0.54
- 在 storage 中,有一部分內存是給 unroll 使用的,unroll 即反序列化 block,該部分占比由 spark.storage.unrollFraction 控制,默認為0.2
- 通過代碼分析,storage 和 execution 總共使用了 80% 的內存,剩余 20% 內存被系統保留了,用來存儲運行中產生的對象,該類型內存不可控.
小結:
- 這種內存管理方式的缺陷,即 execution 和 storage 內存表態分配,即使在一方內存不夠用而另一方內存空閑的情況下也不能共享,造成內存浪費,為解決這一問題,spark1.6 啟用新的內存管理方案UnifiedMemoryManager
- staticMemoryManager- jvm 堆內存分配圖如下
![]()
Spark1.6 內存管理:
從spark1.6開始,引入了新的內存管理方式-----統一內存管理(UnifiedMemoryManager),在統一內存管理下,spark一個executor中的jvm heap內存被劃分成如下圖:
![]()
- Reserved Memory,這一部分的內存是我們無法使用的部分,spark內部保留內存,會存儲一些spark的內部對象等內容。
- spark1.6默認的Reserved Memory大小是300MB。這部分大小是不允許我們使用者改變的。簡單點說就是我們在為executor申請內存后,有300MB是我們無法使用的。并且如果我們申請的executor的大小小于1.5 * Reserved Memory 即 < 450MB,spark會報錯:
- User Memory:用戶在程序中創建的對象存儲等一系列非spark管理的內存開銷都占用這一部分內存
- Spark Memory:該部分大小為 (JVM Heap Size - Reserved Memory) * spark.memory.fraction,其中的spark.memory.fraction可以是我們配置的(默認0.75),如下圖:
![]()
- 如果spark.memory.fraction配小了,我們的spark task在執行時產生數據時,包括我們在做cache時就很可能出現經常因為這部分內存不足的情況而產生spill到disk的情況,影響效率。采用官方推薦默認配置
- Spark Memory這一塊有被分成了兩個部分,Execution Memory 和 Storage Memory,這通過spark.memory.storageFraction來配置兩塊各占的大小(默認0.5,一邊一半),如圖:
![]()
- Storage Memory主要用來存儲我們cache的數據和臨時空間序列化時unroll的數據,以及broadcast變量cache級別存儲的內容
- Execution Memory則是spark Task執行時使用的內存(比如shuffle時排序就需要大量的內存)
- 為了提高內存利用率,spark針對Storage Memory 和 Execution Memory有如下策略:
- 一方空閑,一方內存不足情況下,內存不足一方可以向空閑一方借用內存
- 只有Execution Memory可以強制拿回Storage Memory在Execution Memory空閑時,借用的Execution Memory的部分內存(如果因強制取回,而Storage Memory數據丟失,重新計算即可)
- 如果Storage Memory只能等待Execution Memory主動釋放占用的Storage Memory空閑時的內存。(這里不強制取回,因為如果task執行,數據丟失就會導致task 失敗)
http://www.rzrgm.cn/liuliliuli2017/p/6809094.html
Spark核心概念之Shuffle
以reduceByKey為例解釋shuffle過程。

在沒有task的文件分片合并下的shuffle過程如下:(spark.shuffle.consolidateFiles=false)

fetch 來的數據存放到哪里?
剛 fetch 來的 FileSegment 存放在 softBuffer 緩沖區,經過處理后的數據放在內存 + 磁盤上。這里我們主要討論處理后的數據,可以靈活設置這些數據是“只用內存”還是“內存+磁盤”。如果spark.shuffle.spill = false就只用內存。由于不要求數據有序,shuffle write 的任務很簡單:將數據 partition 好,并持久化。之所以要持久化,一方面是要減少內存存儲空間壓力,另一方面也是為了 fault-tolerance。
shuffle之所以需要把中間結果放到磁盤文件中,是因為雖然上一批task結束了,下一批task還需要使用內存。如果全部放在內存中,內存會不夠。另外一方面為了容錯,防止任務掛掉。
存在問題如下:
產生的 FileSegment 過多。每個 ShuffleMapTask 產生 R(reducer 個數)個 FileSegment,M 個 ShuffleMapTask 就會產生 M * R 個文件。一般 Spark job 的 M 和 R 都很大,因此磁盤上會存在大量的數據文件。
緩沖區占用內存空間大。每個 ShuffleMapTask 需要開 R 個 bucket,M 個 ShuffleMapTask 就會產生 MR 個 bucket。雖然一個 ShuffleMapTask 結束后,對應的緩沖區可以被回收,但一個 worker node 上同時存在的 bucket 個數可以達到 cores R 個(一般 worker 同時可以運行 cores 個 ShuffleMapTask),占用的內存空間也就達到了cores× R × 32 KB。對于 8 核 1000 個 reducer 來說,占用內存就是 256MB。
為了解決上述問題,我們可以使用文件合并的功能。
在進行task的文件分片合并下的shuffle過程如下:(spark.shuffle.consolidateFiles=true)

可以明顯看出,在一個 core 上連續執行的 ShuffleMapTasks 可以共用一個輸出文件 ShuffleFile。先執行完的 ShuffleMapTask 形成 ShuffleBlock i,后執行的 ShuffleMapTask 可以將輸出數據直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每個 ShuffleBlock 被稱為 FileSegment。下一個 stage 的 reducer 只需要 fetch 整個 ShuffleFile 就行了。這樣,每個 worker 持有的文件數降為 cores× R。FileConsolidation 功能可以通過spark.shuffle.consolidateFiles=true來開啟。
Spark核心概念之Cache
val rdd1 = ... // 讀取hdfs數據,加載成RDD
rdd1.cache
val rdd2 = rdd1.map(...)
val rdd3 = rdd1.filter(...)
rdd2.take(10).foreach(println)
rdd3.take(10).foreach(println)
rdd1.unpersist
cache和unpersisit兩個操作比較特殊,他們既不是action也不是transformation。cache會將標記需要緩存的rdd,真正緩存是在第一次被相關action調用后才緩存;unpersisit是抹掉該標記,并且立刻釋放內存。只有action執行時,rdd1才會開始創建并進行后續的rdd變換計算。
cache其實也是調用的persist持久化函數,只是選擇的持久化級別為MEMORY_ONLY。
persist支持的RDD持久化級別如下:

需要注意的問題:
Cache或shuffle場景序列化時, spark序列化不支持protobuf message,需要java 可以serializable的對象。一旦在序列化用到不支持java serializable的對象就會出現上述錯誤。
Spark只要寫磁盤,就會用到序列化。除了shuffle階段和persist會序列化,其他時候RDD處理都在內存中,不會用到序列化。
Spark 資源調優
內存管理:

Executor的內存主要分為三塊:
第一塊是讓task執行我們自己編寫的代碼時使用,默認是占Executor總內存的20%;
第二塊是讓task通過shuffle過程拉取了上一個stage的task的輸出后,進行聚合等操作時使用,默認也是占Executor總內存的20%;
第三塊是讓RDD持久化時使用,默認占Executor總內存的60%。
每個task以及每個executor占用的內存需要分析一下。每個task處理一個partiiton的數據,分片太少,會造成內存不夠。
其他資源配置:

具體調優可以參考美團點評出品的調優文章:
posted on 2020-01-08 14:14 心有多大,世界就有多大 閱讀(1040) 評論(0) 收藏 舉報






浙公網安備 33010602011771號