hadoop yarn application 資源本地化機制
一. 前言
在YARN中, 分布式緩存是一種分布式文件分發與緩存機制,主要作用是將用戶應用程序執行時所需的外部文件資源自動透明地下
載并緩存到各個節點上, 從而省去了用戶手動部署這些文件的麻煩。
二. 工作流程

YARN分布式緩存工作流程具體如下:
步驟1 客戶端將應用程序所需的文件資源(外部字典、 JAR包、 二進制文件等) 提交到HDFS上。
步驟2 客戶端將應用程序提交到ResourceManager上。
步驟3 ResourceManager與某個NodeManager通信, 啟動應用程序ApplicationMaster,NodeManager收到命令后, 首先從HDFS下載文件(緩存) , 然后啟動ApplicationMaster。
步驟4 ApplicationMaster與ResourceManager通信, 以請求和獲取計算資源。
步驟5 ApplicationMaster收到新分配的計算資源后, 與對應的NodeManager通信, 以啟動任務。
步驟6 如果該應用程序第一次在該節點上啟動任務, 則NodeManager首先從HDFS上下載文件緩存到本地, 然后啟動任務。
步驟7 NodeManager后續收到啟動任務請求后, 如果文件已在本地緩存, 則直接運行任務, 否則等待文件緩存完成后再啟動。
各節點上的緩存文件由對應的NodeManager管理和維護。 NodeManager采用了一定的緩存置換算法定期清理失效文件 .
在Hadoop中, 分布式緩存并不是將文件緩存到集群中各個節點的內存中, 而是將文件緩存到各節點的本地磁盤上, 以便執行任務時直接從本地磁盤上讀取文件.
三.資源可見性與分類
分布式緩存機制是由各個NodeManager實現的, 主要功能是將應用程序需要的文件資源(一般是只讀的) 緩存到本地, 以方便后續任務運行。 資源緩存是用時觸發的, 也就是說, 由第一個用到該資源的任務觸發的, 后續同類任務無須再次進行緩存, 直接使用已經緩存好的即可
按照可見性(LocalResourceVisibility) ,NodeManager將資源分為三類:
? PUBLIC: 節點上所有用戶共享該資源, 只要有一個用戶的應用程將這些資源緩存到本地, 其他所有用戶的所有應用程序均可使用它們。
? PRIVATE: 節點上同一用戶的所有應用程序共享該資源, 一旦該用戶的第一個應用程序將之緩存到本地, 該用戶后續所有的應用程序均可共享該資源。
? APPLICATION: 節點上同一應用程序的所有Container共享, 其他用戶或者通用戶的其他程序不可使用該資源。 默認情況下, MapReduce作業的split元信息文件job.splitmetainfo和屬性文件job.xml的可見性是APPLICATION。
**按照資源類型(LocalResourceType) , NodeManager將資源分為三類: **
? ARCHIVE: 歸檔文件, 當前支持后綴為".jar"、 “.zip”、 “.tar.gz”、 “.tgz"和”.tar"的5種歸檔文件, NodeManager能自動在工作目錄中對這5類歸檔文件進行解壓縮(如果是后綴為".jar"的文件, 還可自動將其加到CLASSPATH中) , 方便用戶程序使用。
? FILE: 普通文件, NodeManager只是簡單地將這類文件下載到工作目錄中, 不做任何處理。
? PATTERN: 以上兩種類型的混合體, 有多種類型文件存在, 而用戶可通過一個正則表達式指定哪些屬于ARCHIVE文件, 需要自動解壓縮
YARN是通過比較resource、 timestamp、 type和pattern四個字段是否相同來判斷兩個資源請求是否相同的。 如果一個已經被緩存到各個節點上的文件被用戶修改了, 則下次使用時會自動觸發一次緩存更新, 以重新從HDFS上下載該文件。
message LocalResourceProto {
optional URLProto resource = 1; //通常是存在HDFS上的文件的路徑
optional int64 size = 2; //文件大小
optional int64 timestamp = 3; //最后修改時間
optional LocalResourceTypeProto type = 4; //資源類型
optional LocalResourceVisibilityProto visibility = 5; //資源可見性
optional string pattern = 6; //正則表達式, 當資源可見性為PATTERN時有用
optional bool should_be_uploaded_to_shared_cache = 7;
}
YARN MapReduce是采用目錄權限方式判斷資源可見性的, 如果一個HDFS文件的父目錄的用戶執行權限、 組執行權限和其他組執行權限都是打開的(比如可以為"drwxr-xr-x") , 則認為它具有PUBLIC可見性, 否則是PRIVATE可見性。 換句話說,如果你想將一個文件可見性設置為PUBLIC, 必須在運行MapReduce應用程序之前將它上傳到HDFS上, 并修改它在所目錄的權限。 需要注意的是, 每次運行時臨時由客戶端自動從本地上傳到HDFS上的文件默認全部是PRVIATE權限。
//添加歸檔文件 void addCacheArchive(URI uri, Configuration conf) void setCacheArchives(URI[] archives, Configuration conf) //添加普通文件 void addCacheFile(URI uri, Configuration conf) void setCacheFiles(URI[] files, Configuration conf) //將三方JAR包或者動態庫添加到classpath中 void addFileToClassPath(Path file, Configuration conf) //在任務工作目錄下建立文件軟連接 void createSymlink(Configuration conf)
ARN MapReduce客戶端從Configuration中解析出各個屬性之后(比如mapred.cache.files、 mapred.cache.archives、 mapred.job.classpath.files、mapred.create.symlink等) , 需將之轉換成Protocol Buffers對象的定義方式 (YARN MapReduce客戶端將作業配置文件job.xml、 split文件可見性設置為 APPLICATION)
/**
* 創建 Application Resource 資源文件
* @param fs
* @param p
* @param fileSymlink
* @param type
* @param viz
* @param uploadToSharedCache
* @return
* @throws IOException
*/
private LocalResource createApplicationResource(FileContext fs, Path p,
String fileSymlink, LocalResourceType type, LocalResourceVisibility viz,
Boolean uploadToSharedCache) throws IOException {
//
// 文件緩存限定
// 獲取本地文件
LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
// 獲取資源文件的狀態
FileStatus rsrcStat = fs.getFileStatus(p);
// We need to be careful when converting from path to URL to add a fragment
// so that the symlink name when localized will be correct.
// 限定路徑
Path qualifiedPath = fs.getDefaultFileSystem().resolvePath(rsrcStat.getPath());
URI uriWithFragment = null;
boolean useFragment = fileSymlink != null && !fileSymlink.equals("");
try {
if (useFragment) {
uriWithFragment = new URI(qualifiedPath.toUri() + "#" + fileSymlink);
} else {
uriWithFragment = qualifiedPath.toUri();
}
} catch (URISyntaxException e) {
throw new IOException(
"Error parsing local resource path."
+ " Path was not able to be converted to a URI: " + qualifiedPath,
e);
}
// 設置資源文件
rsrc.setResource(URL.fromURI(uriWithFragment));
// 設置資源文件 大小
rsrc.setSize(rsrcStat.getLen());
// 設置資源文件 時間戳
rsrc.setTimestamp(rsrcStat.getModificationTime());
// 設置資源文件 類型
rsrc.setType(type);
// 設置資源文件 可見性
rsrc.setVisibility(viz);
// // 設置資源文件 緩存
rsrc.setShouldBeUploadedToSharedCache(uploadToSharedCache);
return rsrc;
}
四.分布式緩存實現
按可見性YARN將資源分為3類, 分別是PUBLIC、 PRIVATE和APPLICATION, 不同可見性的資源開放給用戶的權限不同, 這是通過設置特殊的目錄位置和目錄權限實現的NodeManager采用輪詢分配策略將這三類資源存放在yarn.nodemanager.local-dirs指定的目錄列表中, 在每個目錄中, 資源將按照以下方式存放:
? PUBLIC資源:存放在${yarn.nodemanager.local-dirs}/filecache/目錄下, 每個資源將單獨存放在以一個隨機整數命名的目錄中, 且目錄的訪問權限均為755 (rwxr-xr-x)。
? PRIVATE資源: 存放在${yarn.nodemanager.local-dirs}/usercache/${user}/filecache/目錄下(其中, ${user}是應用程序提交者, 默認情況下, 均為NodeManager啟動者) , 每個資源將單獨存放在以一個隨機整數命名的目錄中,
且目錄的訪問權限均為710 (rwx–x—)。
? APPLICATION資源: 存放在${yarn.nodemanager.localdirs}/usercache/${user}/${appcache}/${appid}/filecache/目錄下(其中,${appid}是應用程序ID) , 每個資源將單獨存放在以一個隨機整數命名的目錄中, 且目錄的訪問權限均為710 (rwx–x—);

Container的工作目錄位于${yarn.nodemanager.localdirs}/usercache/${user}/${appcache}/${appid}/${containerid}目錄下(其中, ${containerid }是Container ID) , 它運行所需的外部資源, 比如JAR包、 字典文件等, 處于各個filecache
目錄中, 為了避免文件復制帶來性能影響, 它會建立一個到這些文件的軟連接。
不同可見性資源的緩存機制實現不同:
對于PUBLIC資源, 由公共服務ResourceLocalizationService中的一個公用線程PublicLocalizer下載, 它內部維護了一個線程池并行下載資源; 對于PRIVATE和APPLICATION資源, 則由公共服務ResourceLocalizationService中一個專門線程LocalizerRunner(一個Container對應一個LocalizerRunner線程) 下載, 同一個Container的所有資源是串行下載的 。
考慮到PRIVATE和APPLICATION資源通常是用戶或者應用程序私有的, 不允許其他用戶看到,因此對應的目錄需有嚴格的權限設置, 這是由ContainerExecutor組件設置的(目前有DefaultContainerExecutor和LinuxContainerExecutor兩種實現) 。
ResourceLocalizationService內部啟動了一個(實現了LocalizationProtocol協議的) RPC服務器, 而ContainerExecutor則啟動了一個資源下載客戶端ContainerLocalizer, 它不斷地從服務端獲取待下載資源信息(ResourceLocalizationService維護了各種待下載資源列表) , 然后下載到被設置了特定權限的目錄中
分布式緩存完成的主要功能是文件下載, 涉及大量的磁盤讀寫, 因此整個過程采用了異步并發模型以加快文件下載速度, 同時避免同步模型帶來的性能開銷。
為了避免緩存的文件過多導致磁盤“撐爆”, NodeManager會定期清理過期的緩存文件, 具體方法如下: 每隔一定時間yarn.nodemanager.localizer.cache.cleanup.interval-ms(單位是毫秒, 默認值是10×60×1000, 即10分鐘) 啟動一次清理工作, 確保每個緩存目錄中文件容量小于yarn.nodemanager.localizer.cache.target-size-mb(單位是MB, 默認是10240,即10GB) , 如果超過該值, 則采用LRU(Least Recently Used) 算法清除已不再使用的緩存文件, 直至文件容量低于設定值。

浙公網安備 33010602011771號