分布式系統Hadoop源碼閱讀與分析(一):作業調度器實現機制
上一篇博文對Hadoop的作業調度器進行了介紹,我們知道,JobTracker和TaskTracker是Hadoop作業調度過程中最核心的兩個部分,前者負責map/reduce作業的調度與分派,后者負責map/reduce作業的實際執行,它們之間通過RPC機制進行通訊。下面將對Hadoop 0.20.2版本中作業調度相關源碼進行分析,至于JobTracker和TaskTracker中與作業調度無關的源碼部分,并未進行詳細介紹。
1. JobTracker部分

1.1 JobTracker
JobTracker是作業調度的控制類,實現了InterTrackerProtocol和TaskTrackerManager接口(當然,還有其他接口,這里我們只關心調度相關的接口),維護一個TaskScheduler的實例,并托管其生命周期。
在JobTracker的主方法中,首先,構造JobTracker實例對象,在此過程中,TaskScheduler實例會伴隨JobTracker一起被構造,另外,通過RPC,構造RPC服務器interTrackerServer;當完成JobTracker的構造后,接著給JobTracker的TaskScheduler類型變量taskScheduler設置TaskTrackerManager實例字段為當前的JobTracker實例。然后,調用JobTracker的offerService方法開始提供服務。
JobTracker.heartbeat方法主要是TaskTracker端遠程調用時用到的方法,其主要作用就是分派具體任務(將Task封裝成LaunchTaskAction),并將封裝后的Task分發到TaskTracker端。
1.2 TaskScheduler
TaskScheduler是作業調度器的抽象基類,實現了Configurable接口,具體的實現有JobQueueTaskScheduler和LimitTasksPerJobTaskScheduler。
在TaskScheduler中,通過維護Configuration和TaskTrackerManager成員變量,實現對作業隊列中所有Job的調度過程,具體見assignTasks方法。
1.3 JobQueueTaskScheduler
JobQueueTaskScheduler繼承于TaskScheduler,為Hadoop中的默認調度器,實現FIFO調度隊列,作業隊列按優先級和提交時間進行排序。
1.4 LimitTasksPerJobTaskScheduler
LimitTasksPerJobTaskScheduler繼承于JobQueueTaskScheduler,在JobQueueTaskScheduler基礎上,可以對每個Job的Task總數作限制。
1.5 TaskTrackerManager
TaskTrackerManager接口用于管理運行中的Cluster的所有TaskTracker信息,它被JobTracker實現,同時供TaskScheduler調度使用。
1.6 InterTrackerProtocol
InterTrackerProtocol接口是JobTracker和TaskTracker之間通信的協議接口, JobTracker作為RPC調用的Server端,實現InterTrackerProtocol接口,供TaskTracker遠程調用。
1.7 Configurable
Configurable接口用于配置Configuration對象。
2.
TaskTracker部分

2.1 TaskTracker
Task的執行實際是由TaskTracker發起的,TaskTracker會定期(缺省為3秒鐘,參見MRConstants類中定義的HEARTBEAT_INTERVAL變量)與JobTracker進行一次通信,報告自己Task的執行狀態,接收JobTracker的指令等,TaskTracker里面會通過循環的方式查找。
2.2 TaskTrackerStatus
TaskTrackerStatus是TaskTracker的狀態類,記錄了TaskTracker的資源使用情況。
2.3 HeartbeatResponse
HeartbeatResponse是心跳回復信息類,記錄了經TaskTracker發出的心跳,由JobTracker處理后產生的回復信息。
2.4 TaskTrackerAction
TaskTrackerAction記錄TaskTracker的action,它有四個子類,分別是KillJobAction、KillTaskAction、LaunchTaskAction和CommitTaskAction。
2.5 TaskTracker$TaskInProgress
TaskInProgress是TaskTracker的內部類,它主要負責對每個執行中的Task任務的監控和具體調度。
2.6 TaskTracker$TaskLauncher
TaskLauncher是個繼承于Thread的TaskTracker的內部類,在這里面會維護一個TaskInProgress的鏈表:
private List<TaskInProgress> tasksToLaunch;
該列表中的每個TaskInProgress 實例對應一個TaskUnit任務。
該類的run方法才是主體關鍵之處,它會循環判斷是否tasksToLaunch中有新任務要做,有就從該列表中拿出來,然后去調用TaskTracker.startNewTask(TaskInprogress)去開啟一個新任務。
2.7 Task
Task為抽象類,代表任務,它有兩個子類:MapTask 和 ReduceTask。
2.8 TaskRunner
TaskRunner為Task的執行類,它有兩個子類:MapTaskRunner和ReduceTaskRunner。
3.
RPC過程

3.1 RPC.getServer()
RPC服務端接口。為指定協議的實例,在指定的地址和端口上啟動服務。
3.2 RPC.waitForProxy()
RPC客戶端接口,創建一個指定服務端的代理。
浙公網安備 33010602011771號