您好,登錄后才能下訂單哦!
作者|白松
關于Giraph 共有九個章節,本文第五個章節。
環境:在單機上(機器名:giraphx)啟動了2個workers。
輸入:SSSP文件夾,里面有1.txt和2.txt兩個文件。
1、在Worker向Master匯報健康狀況后,就開始等待Master創建InputSplit。
方法:每個Worker通過檢某個Znode節點是否存在,同時在此Znode上設置Watcher。若不存在,就通過BSPEvent的waitForever()方法釋放當前線程的鎖,陷入等待狀態。一直等到master創建該znode。此步驟位于BSPServiceWorker類中的startSuperStep方法中,等待代碼如下:
cdn.xitu.io/2019/8/8/16c6f1c19ae23057?w=558&h=454&f=png&s=237620">
2、Master調用createInputSplits()方法創建InputSplit。
在generateInputSplits()方法中,根據用戶設定的VertexInputFormat獲得InputSplits。代碼如下:
其中minSplitCountHint為創建split的最小數目,其值如下:
minSplitCountHint = Workers數目 * NUM_INPUT_THREADS
NUM_INPUT_THREADS表示 每個Input split loading的線程數目,默認值為1 。 經查證,在TextVertexValueInputFormat抽象類中的getSplits()方法中的minSplitCountHint參數被忽略。用戶輸入的VertexInputFormat繼承TextVertexValueInputFormat抽象類。
如果得到的splits.size小于minSplitCountHint,那么有些worker就沒被用上。
得到split信息后,要把這些信息寫到Zookeeper上,以便其他workers訪問。上面得到的split信息如下:
[hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66, hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46]
遍歷splits List,為每個split創建一個Znode,值為split的信息。如為split-0創建Znode,值為:hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0
為split-1創建znode(如下),值為:hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1
最后創建znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllReady 表示所有splits都創建好了。
3、Master根據splits創建Partitions。首先確定partition的數目。
BSPServiceMaster中的MasterGraphPartitioner<I.V,E,M>對象默認為HashMasterPartitioner。它的createInitialPartitionOwners()方法如下:
上面代碼中是在工具類PartitionUtils計算Partition的數目,計算公式如下:
partitionCount=PARTITION_COUNT_MULTIPLIER availableWorkerInfos.size() availableWorkerInfos.size() ,其中PARTITION_COUNT_MULTIPLIER表示Multiplier for the current workers squared,默認值為1 。
可見,partitionCount值為4(122)。創建的partitionOwnerList信息如下:
[(id=0,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),
(id=1,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null),
(id=2,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),
(id=3,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null)]
4、Master創建Znode:/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_partitionExchangeDir,用于后面的exchange partition。
5、Master最后在assignPartitionOwners()方法中
把masterinfo,chosenWorkerInfoList,partitionOwners等信息寫入Znode中(作為Znode的data),該Znode的路徑為: /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_addressesAndPartitions 。
Master調用barrierOnWorkerList()方法開始等待各個Worker完成數據加載。調用關系如下:
barrierOnWorkerList中創建znode,path=/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir 。然后檢查該znode的子節點數目是否等于workers的數目,若不等于,則線程陷入等待狀態。后面某個worker完成數據加載后,會創建子node(如 /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1)來激活該線程繼續判斷。
6、當Master創建第5步的znode后,會激活worker。
每個worker從znode上讀出data,data包含masterInfo,WorkerInfoList和partitionOwnerList,然后各個worker開始加載數據。
把partitionOwnerList復制給BSPServiceWorker類中的workerGraphPartitioner(默認為HashWorkerPartitioner類型)對象的partitionOwnerList變量,后續每個頂點把根據vertexID通過workerGraphPartitioner對象獲取其對應的partitionOwner。
每個Worker從znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir獲取子節點,得到inputSplitPathList,內容如下:
[/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1,
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0]
然后每個Worker創建N個InputsCallable線程讀取數據。N=Min(NUM_INPUT_THREADS,maxInputSplitThread),其中NUM_INPUT_THREADS默認值為1,maxInputSplitThread=(InputSplitSize-1/maxWorkers +1
那么,默認每個worker就是創建一個線程來加載數據。
在InputSplitsHandler類中的reserveInputSplit()方法中,每個worker都是遍歷inputSplitPathList,通過創建znode來保留(標識要處理)的split。代碼及注釋如下:
當用reserveInputSplit()方法獲取某個znode后,loadSplitsCallable類的loadInputSplit方法就開始通過該znode獲取其HDFS的路徑信息,然后讀入數據、重分布數據。
VertexInputSplitsCallable類的readInputSplit()方法如下:
7、每個worker加載完數據后,調用waitForOtherWorkers()方法等待其他workers都處理完split。
策略如下,每個worker在/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir目錄下創建子節點,后面追加自己的worker信息,如worker1、worker2創建的子節點分別如下:
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_2
創建完后,然后等待master創建/_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone。
8、從第5步驟可知,若master發現/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir下的子節點數目等于workers的總數目,就會在coordinateInputSplits()方法中創建
_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone,告訴每個worker,所有的worker都處理完了split。
9、最后就是就行全局同步。
master創建znode,path=/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir ,然后再調用barrierOnWorkerList方法檢查該znode的子節點數目是否等于workers的數目,若不等于,則線程陷入等待狀態。等待worker創建子節點來激活該線程繼續判斷。
每個worker獲取自身的Partition Stats,進入finishSuperStep方法中,等待所有的Request都被處理完;把自身的Aggregator信息發送給master;創建子節點,如/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir/giraphx_1,data為該worker的partitionStatsList和workerSentMessages統計量;
最后調用waitForOtherWorkers()方法等待master創建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節點。
master發現/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir的子節點數目等于workers數目后,根據/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir子節點上的data收集每個worker發送的aggregator信息,匯總為globalStats。
Master若發現全局信息中(1)所有頂點都voteHalt且沒有消息傳遞,或(2)達到最大迭代次數 時,設置 globalStats.setHaltComputation(true)。告訴works結束迭代。
master創建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節點,data為globalStats。告訴所有workers當前超級步結束。
每個Worker檢測到master創建/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節點后,讀出該znode的數據,即全局的統計信息。然后決定是否繼續下一次迭代。
10、同步之后開始下一個超級步。
11、master和workers同步過程總結。
(1)master創建znode A,然后檢測A的子節點數目是否等于workers數目,不等于就陷入等待。某個worker創建一個子節點后,就會喚醒master進行檢測一次。
(2)每個worker進行自己的工作,完成后,創建A的子節點A1。然后等待master創建znode B。
(3)若master檢測到A的子節點數目等于workers的數目時,創建Znode B
(4)master創建B 節點后,會激活各個worker。同步結束,各個worker就可以開始下一個超步。
本質是通過znode B來進行全局同步的。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。