Hadoop系列008-HDFS的數據流
本人微信公眾號,歡迎掃碼關注!

HDFS的數據流
1 HDFS寫數據流程
1.1 剖析文件寫入

1)客戶端向namenode請求上傳文件,namenode檢查目標文件是否已存在,父目錄是否存在。
2)namenode返回是否可以上傳。
3)客戶端請求第一個 block上傳到哪幾個datanode服務器上。
4)namenode返回3個datanode節點,分別為dn1、dn2、dn3。
5)客戶端請求dn1上傳數據,dn1收到請求會繼續調用dn2,然后dn2調用dn3,將這個通信管道建立完成
6)dn1、dn2、dn3逐級應答客戶端
7)客戶端開始往dn1上傳第一個block(先從磁盤讀取數據放到一個本地內存緩存),以packet為單位,dn1收到一個packet就會傳給dn2,dn2傳給dn3;dn1每傳一個packet會放入一個應答隊列等待應答
8)當一個block傳輸完成之后,客戶端再次請求namenode上傳第二個block的服務器。(重復執行3-7步)
1.2 網絡拓撲概念

在本地網絡中,兩個節點被稱為“彼此近鄰”是什么意思?在海量數據處理中,其主要限制因素是節點之間數據的傳輸速率——帶寬很稀缺。這里的想法是將兩個節點間的帶寬作為距離的衡量標準。
節點距離:兩個節點到達最近的共同祖先的距離總和。
例如,假設有數據中心d1機架r1中的節點n1。該節點可以表示為/d1/r1/n1。利用這種標記,這里給出四種距離描述。
Distance(/d1/r1/n1, /d1/r1/n1)=0(同一節點上的進程)
Distance(/d1/r1/n1, /d1/r1/n2)=2(同一機架上的不同節點)
Distance(/d1/r1/n1, /d1/r3/n2)=4(同一數據中心不同機架上的節點)
Distance(/d1/r1/n1, /d2/r4/n2)=6(不同數據中心的節點)
1.3 機架感知(副本節點選擇)
1.3.1 官方地址
http://hadoop.apache.org/docs/r2.7.3/hadoop-project-dist/hadoop-common/RackAwareness.html
1.3.2 低版本Hadoop復本節點選擇

- 第一個復本在client所處的節點上。如果客戶端在集群外,隨機選一個。
- 第二個復本和第一個復本位于不相同機架的隨機節點上。
- 第三個復本和第二個復本位于相同機架,節點隨機。
1.3.3 Hadoop2.7.2副本節點選擇

- 第一個副本在client所處的節點上。如果客戶端在集群外,隨機選一個。
- 第二個副本和第一個副本位于相同機架,隨機節點。
- 第三個副本位于不同機架,隨機節點。
1.3.4 自定義機架感知
-
(0)環境準備
-
(a)數據節點的量
[rack1]:hadoop102、hadoop103
[rack2]:hadoop104、hadoop105
-
(b)增加一個數據節點
(1)克隆一個節點
(2)啟動新節點
(3)修改克隆的ip和主機名
(4)在hadoop102上ssh到新節點
(5)修改xsync.sh和xcall.sh文件
(6)修改hadoop102 slaves文件,再分發
-
-
(1)創建類實現DNSToSwitchMapping接口
public class MyDNSToSwichMapping implements DNSToSwitchMapping { // 傳遞的是客戶端的ip列表,返回機架感知的路徑列表 public List<String> resolve(List<String> names) { ArrayList<String> lists = new ArrayList<String>(); if (names != null && names.size() > 0) { for (String name : names) { int ip = 0; // 獲取ip地址 if (name.startsWith("hadoop")) { String no = name.substring(6); // hadoop102 ip = Integer.parseInt(no); } else if (name.startsWith("192")) { // 192.168.10.102 ip = Integer.parseInt(name.substring(name.lastIndexOf(".") + 1)); } // 定義機架 if (ip < 104) { lists.add("/rack1/" + ip); } else { lists.add("/rack2/" + ip); } } } // 把ip地址打印出來 try { FileOutputStream fos = new FileOutputStream("/home/atguigu/name.txt"); for (String name : lists) { fos.write((name + "\r\n").getBytes()); } fos.close(); } catch (Exception e) { e.printStackTrace(); } return lists; } public void reloadCachedMappings() { } public void reloadCachedMappings(List<String> names) { } } -
(2)配置core-site.xml
-
默認的:
<!-- Topology Configuration --> <property> <name>net.topology.node.switch.mapping.impl</name> <value>org.apache.hadoop.net.ScriptBasedMapping</value> </property> -
配置后的
<!-- Topology Configuration --> <property> <name>net.topology.node.switch.mapping.impl</name> <value>com.atguigu.hdfs.MyDNSToSwichMapping</value> </property>
-
-
(3)分發core-site.xml
xsync /opt/module/hadoop-2.7.2/etc/hadoop/core-site.xml -
(4)編譯程序,打成jar,分發到所有節點的hadoop的classpath下
cd /opt/module/hadoop-2.7.2/share/hadoop/common/lib xsync MyDNSSwitchToMapping.jar -
(5)重新啟動集群
-
(6)在名稱節點hadoop103主機上查看名稱
-
(7)查看結果
-
(1)在hadoop105節點上傳文件到hdfs文件系統,查看復本存放位置
![]()
-
(2)在hadoop102節點上傳文件到hdfs文件系統,查看復本存放位置
![]()
-
(3)結論
第一個副本在client所處的節點上。如果客戶端在集群外,隨機選一個。
第二個副本和第一個副本位于相同機架,隨機節點。
第三個副本位于不同機架,隨機節點。
-
2 HDFS讀數據流程

1)客戶端向namenode請求下載文件,namenode通過查詢元數據,找到文件塊所在的datanode地址。
2)挑選一臺datanode(就近原則,然后隨機)服務器,請求讀取數據。
3)datanode開始傳輸數據給客戶端(從磁盤里面讀取數據放入流,以packet為單位來做校驗)。
4)客戶端以packet為單位接收,先在本地緩存,然后寫入目標文件。
3 一致性模型
3.1 debug調試如下代碼
@Test
public void writeFile() throws Exception{
// 1 創建配置信息對象
Configuration configuration = new Configuration();
fs = FileSystem.get(configuration);
// 2 創建文件輸出流
Path path = new Path("hdfs://hadoop102:8020/user/atguigu/hello.txt");
FSDataOutputStream fos = fs.create(path);
// 3 寫數據
fos.write("hello".getBytes());
// fos.flush();
fos.hflush();
//
// fos.write("welcome to atguigu".getBytes());
// fos.hsync();
fos.close();
}
3.2 總結
- 寫入數據時,如果希望數據被其他client立即可見,調用如下方法
- FsDataOutputStream.hflus(); //清理客戶端緩沖區數據,被其他client立即可見
- FsDataOutputStream.hsync(); //清理客戶端緩沖區數據,被其他client不能立即可見



浙公網安備 33010602011771號