您好,登錄后才能下訂單哦!
本篇內容介紹了“Hadoop和spark為何要對key進行排序”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
只要對hadoop
中mapreduce
的原理清楚的都熟知下面的整個流程運行原理,其中涉及到至少三次排序,分別是溢寫快速排序,溢寫歸并排序,reduce
拉取歸并排序,而且排序是默認的,即天然排序的,那么為什么要這么做的,設計原因是什么。先給個結論,為了整體更穩定,輸出滿足多數需求,前者體現在不是采用hashShuffle
而是sortShuffle
,后者體現在預計算,要知道排序后的數據,在后續數據使用時的會方便很多,比如體現索引的地方,如reduce
拉取數據時候。
在分析設計原因之前,先理解一下整個過程,在map
階段,根據預先定義的partition
規則進行分區,map
首先將輸出寫到緩存中,當緩存內容達到閾值時,將結果spill
到硬盤,每一次spill
都會在硬盤產生一個spill
文件,因此一個map task可能會產生多個spill
文件,其中在每次spill
的時候會對key
進行排序。接下來進入shuffle
階段,當map
寫出最后一個輸出,需要在map
端進行一次merge
操作,按照partition
和partition
內的key
進行歸并排序(合并+排序),此時每個partition
內按照key
值整體有序。然后開始第二次merge
,這次是在reduce
端,在此期間數據在內存和磁盤上都有,其實這個階段的merge
并不是嚴格意義上的排序,也是跟前面類似的合并+排序,只是將多個整體有序的文件merge
成一個大的文件,最終完成排序工作。分析完整個過程后,是不是覺得如果自己實現MapReduce
框架的話,考慮用HashMap
輸出map內容即可。
整個流程圖如下:
詳細步驟:
首先,讀取數據組件InputFormat
(默認TextInputFormat
)會通過getSplits
方法對輸?入?目錄中文件進行邏輯切?片規劃得到splits
,有多少個split
就對應啟動多少個MapTask
。split
與block
的對應關系默認是?對?。
將輸入文件切分為splits
之后,由RecordReader
對象(默認LineRecordReader
)進行讀取,以\n
作為分隔符,讀取?行數據,返回<key,value>
。Key
表示每?行行?首字符偏移值,value
表示這?行文本內容。
讀取split
返回<key,value>
,進?入?用戶自己繼承的Mapper
類中,執行用戶重寫的map
函數。RecordReader
讀取?行這里調用一次。
map
邏輯完之后,將map
的每條結果通過context.write
進?行行collect
數據收集。在collect
中,會先對其進行分區處理,默認使用HashPartitioner
。MapReduce
提供Partitioner
接口,它的作用就是根據key
或value
及reduce
的數量來決定當前的這對輸出數據最終應該交由哪個reduce task
處理。默認對key hash
后再以reduce task
數量量取模。默認的取模方式只是為了平均reduce
的處理能力,如果用戶自己對Partitioner
有需求,可以訂制并設置到job
上。
接下來,會將數據寫入內存,內存中這?片區域叫做環形緩沖區,緩沖區的作用是批量量收集map
結果,減少磁盤IO
的影響。我們的key/value
對以及Partition
的結果都會被寫?入緩沖區。當然寫?入之前,key
與value
值都會被序列列化成字節數組
環形緩沖區其實是一個數組,數組中存放著key
、value
的序列化數據和key
、value
的元數據信息,包括partition
、key
的起始位置、value
的起始位置以及value
的長度。環形結構是一個抽象概念。
緩沖區是有大小限制,默認是100MB
。當map task
的輸出結果很多時,就可能會撐爆內存,所以需要在一定條件下將緩沖區中的數據臨時寫?入磁盤,然后重新利利?用這塊緩沖區。這個從內存往磁盤寫數據的過程被稱為Spill
,中文可譯為溢寫。這個溢寫是由單獨線程來完成,不影響往緩沖區寫map結果的線程。溢寫線程啟動時不不應該阻?止
map的結果輸出,所以整個緩沖區有個溢寫的?比例例spill.percent
。這個?比例例默認是0.8
,也就是當緩沖區的數據已經達到閾值(buffer size * spillpercent = 100MB * 0.8 = 80MB)
,溢寫線程啟動,鎖定這80MB
的內存,執行溢寫過程Maptask
的輸出結果還可以往剩下的20MB
內存中寫,互不不影響、
當溢寫線程啟動后,需要對這80MB
空間內的key
做排序(Sort
)。排序是MapReduce
模型默認的?行行為!
如果job
設置過Combiner
,那么現在就是使?用Combiner
的時候了了。將有相同key
的key/value
對的value
加起來,減少溢寫到磁盤的數據量量。Combiner
會優化MapReduce
的中間結果,所以它在整個模型中會多次使用。
那哪些場景才能使?用Combiner
呢?從這?里里分析,Combiner
的輸出是Reducer
的輸?,Combiner
絕不不能改變最終的計算結果。Combiner
只應該?用于那種Reduce
的輸入key/value
與輸出key/value
類型完全一致,且不不影響最終結果的場景。?比如累加,最?大值等。Combiner
的使?用一定得慎重如果用的好,它對job
執?行行效率有幫助,反之會影響reduce
的最終結果
合并溢寫文件:每次溢寫會在磁盤上生成一個臨時文件(寫之前判斷是否有combiner
),如果map
的輸出結果真的很大,有多次這樣的溢寫發生,磁盤上相應的就會有多個臨時文件存在。當整個數據處理理結束之后開始對磁盤中的臨時文件進?行行merge
合并,因為最終文件只有一個,寫?磁盤,并且為這個文件提供了一個索文件,以記錄每個reduce
對應數據的偏移量量。
Reduce
?大致分為copy
、sort
、reduce
三個階段,重點在前兩個階段。copy
階段包含?一個 eventFetcher
來獲取已完成的map
列列表,由Fetcher線程去copy
數據,在此過程中會啟動兩個merge
線程,分別為inMemoryMerger
和onDiskMerger
,分別將內存中的數據merge
到磁盤和將磁盤中的數據進?merge
。待數據copy
完成之后,copy
階段就完成了,開始進?行行sort
階段,sort
階段主要是執?finalMerge
操作,純粹的sort
階段,完成之后就是reduce
階段,調?用?用戶定義的reduce
函數進?處理。 詳細步驟
簡單地拉取數據。Reduce
進程啟動一些數據copy
線程(Fetcher
),通過HTTP
方式請求maptask
獲取屬于自己的文件。
Merge
階段。這?里里的merge
如map
端的merge
動作,只是數組中存放的是不不同map
端copy
來的數值。Copy
過來的數據會先放入內存緩沖區中,這?里里的緩沖區大小要?比map
端的更更為靈活。merge
有三種形式:內存到內存;內存到磁盤;磁盤到磁盤。默認情況下第?一種形式不不啟?用。當內存中的數據量量到達一定閾值,就啟動內存到磁盤的merge
。與map
端類似,這也是溢寫的過程,這個過程中如果你設置有Combiner
,也是會啟?用的,然后在磁盤中生成了了眾多的溢寫文件。第二種merge方式?一直在運?行行,直到沒有map
端的數據時才結束,然后啟動第三種磁盤到磁盤的merge
方式生成最終的文件。
把分散的數據合并成一個?大的數據后,還會再對合并后的數據排序。對排序后的鍵值對調?用reduce
方法,鍵相等的鍵值對調?用一次reduce
方法,每次調?用會產生零個或者多個鍵值對,最后把這些輸出的鍵值對寫入到HDFS
文件中。
“Hadoop和spark為何要對key進行排序”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。