亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

(第4篇)hadoop之魂--mapreduce計算框架,讓收集的數據產生價值

發布時間:2020-07-09 11:55:26 來源:網絡 閱讀:1536 作者:I加加 欄目:大數據

通過前面的學習,大家已經了解了HDFS文件系統。有了數據,下一步就要分析計算這些數據,產生價值。接下來我們介紹Mapreduce計算框架,學習數據是怎樣被利用的。

Mapreduce計算框架

如果Hadoop比做一頭大象,那么MapReduce就是那頭大象的電腦。MapReduceHadoop核心編程模型。在Hadoop中,數據處理核心就是MapReduce程序設計模型。

本章內容:

1) MapReduce編程模型

2) MapReduce執行流程

3) MapReduce數據本地化

4) MapReduce工作原理

5) MapReduce錯誤處理機制

1. MapReduce編程模型

MapReduce的概念是從函數式變成語言中借來的,整個MapReduce計算過程分為Map階段Reduce階段,為映射和縮減階段這兩個獨立的階段實際上是兩個獨立的過程,Map過程和Reduce過程,Map中進行數據的讀取和預處理,之后預處理的結果發送到Reduce中進行合并

我們通過一個代碼案例,讓大家快速熟悉如何通過代碼,快速實現一個我們自己的MapReduce

案例分布式計算出一篇文章中的各個單詞出現的次數,也就是WordCount

1) 創建map.py文件,寫入以下代碼:

#!/usr/bin/env python

import sys

word_list = []

for line in sys.stdin:

    word_list = line.strip().split(' ')

    if len(word_list) <= 0:

        continue

    for word in word_list:

        w = word.strip()

        if len(w) <= 0:

            continue

        print '\t'.join([w, "1"])

該代碼主要工作是從文章數據源逐行讀取,文章中的單詞之間以空格分割

word_list = line.strip().split(' ')這塊代碼是將當前讀取的一整行數據按照空格分割,將分割后的結果存入word_list數組中,然后通過for word in word_list遍歷數組,取出每個單詞,后面追加“1”標識當前word出現1次。

2) 創建reduce.py,寫入以下代碼:

#!/usr/bin/env python

 

import sys

 

cur_word = None

sum_of_word = 0

 

for line in sys.stdin:

    ss = line.strip().split('\t')

    if len(ss) != 2:

        continue

    word = ss[0].strip()

    count = ss[1].strip()

 

    if cur_word == None:

        cur_word = word

 

    if cur_word != word:

        print '\t'.join([cur_word, str(sum_of_word)])

        sum_of_word = 0

        cur_word = word

 

    sum_of_word += int(count)

 

print '\t'.join([cur_word, str(sum_of_word)])

sum_of_word = 0

代碼針對map階段的數組進行匯總處理,mapreduce過程中默認存在shuffle partition分組機制,保證同一個word的記錄,會連續傳輸reduce中,所以reduce階段只需要對連續相同的word后面的技術進行累加求和即可。

3) 本地模擬測試腳本

]$ cat big.txt | python map.py | sort -k1 | python reduce.py

cat     1

run     3

see     2

spot    2

the     1

6) 腳本執行流程:

(第4篇)hadoop之魂--mapreduce計算框架,讓收集的數據產生價值

2. MapReduce執行流程

上面的例子屬于MapReduce計算框架的一般流程,經過整理總結

 (第4篇)hadoop之魂--mapreduce計算框架,讓收集的數據產生價值

1) 輸入和拆分

不屬于map和reduce的主要過程,但屬于整個計算框架消耗時間的一部分,該部分會為正式的map準備數據


分片(split)操作

split只是將源文件的內容分片形成一系列的InputSplit,每個InputSpilt中存儲著對應分片的數據信息(例如,文件塊信息、起始位置、數據長度、所在節點列表…),并不是將源文件分割成多個小文件,每個InputSplit都由一個mapper進行后續處理。


每個分片大小參數很重要的,splitSize是組成分片規則很重要的一個參數,該參數由三個值來確定:

minSize:splitSize的最小值,由mapred-site.xml配置文件中mapred.min.split.size參數確定。

maxSize:splitSize的最大值,由mapred-site.xml配置文件中mapreduce.jobtracker.split.metainfo.maxsize參數確定。

blockSize:HDFS中文件存儲的快大小,由hdfs-site.xml配置文件中dfs.block.size參數確定。

splitSize的確定規則:splitSize=max{minSize,min{maxSize,blockSize}}


數據格式化(Format)操作:

將劃分好的InputSplit格式化成鍵值對形式的數據。其中key為偏移量,value是每一行的內容。

值得注意的是,在map任務執行過程中,會不停的執行數據格式化操作,每生成一個鍵值對就會將其傳入map,進行處理。所以map和數據格式化操作并不存在前后時間差,而是同時進行的。

(第4篇)hadoop之魂--mapreduce計算框架,讓收集的數據產生價值


2) Map映射

Hadoop并行性質發揮的地方。根據用戶指定的map過程,MapReduce嘗試在數據所在機器上執行該map程序。HDFS中文件數據是復制多份的,所以計算將會選擇擁有此數據的最空閑的節點

這一部分,map內部具體實現過程,可以由用戶自定義。


3) Shuffle派發

Shuffle過程是指Mapper產生的直接輸出結果,經過一系列的處理,成為最終的Reducer直接輸入數據為止的整個過程。這是mapreduce的核心過程。該過程可以分為兩個階段:


Mapper端的Shuffle:Mapper產生的結果并不會直接寫入到磁盤中,而是先存儲在內存中,當內存中的數據量達到設定的閥值時,一次性寫入到本地磁盤中。并同時進行sort(排序)、combine(合并)、partition(分片)等操作。其中,sort是把Mapper產生的結果按照key值進行排序;combine是把key值相同的記錄進行合并;partition是把數據均衡的分配給Reducer。


Reducer端的Shuffle:由于Mapper和Reducer往往不在同一個節點上運行,所以Reducer需要從多個節點上下載Mapper的結果數據,并對這些數據進行處理,然后才能被Reducer處理。

 

 

4) Reduce縮減

Reducer接收形式的數據流,形成形式的輸出,具體的過程可以由用戶自定義,最終結果直接寫入hdfs。每個reduce進程會對應一個輸出文件,名稱以part-開頭。


3. MapReduce數據本地化(Data-Local

首先HDFSMapReduce是Hadoop的核心設計。對于HDFS,是存儲基礎,在數據層面上提供了海量數據存儲的支持。而MapReduce,是數據的上一層,通過編寫MapReduce程序對海量數據進行計算處理


前面HDFS章節中,知道NameNode是文件系統的名字節點進程DataNode是文件系統的數據節點進程。


MapReduce計算框架中負責計算任務調度的JobTracker對應HDFS的NameNode的角色,只不過一個負責計算任務調度,一個負責存儲任務調度


MapReduce計算框架中負責真正計算任務的TaskTracker對應到HDFS的DataNode角色,一個負責計算,一個負責管理存儲數據

考慮本地化原則”,一般,將NameNode和JobTracker部署到同一臺機器上,各個DataNode和TaskNode同樣部署到同一臺機器上


(第4篇)hadoop之魂--mapreduce計算框架,讓收集的數據產生價值  


這樣做的目的是map任務分配給含有該map處理的數據塊的TaskTracker上,同時將程序JAR包復制到該TaskTracker上來運行,這叫“運算移動,數據不移動”。而分配reduce任務時并不考慮數據本地化。


4. MapReduce工作原理

我們通過Client、JobTrask和TaskTracker的角度來分析MapReduce的工作原理:

(第4篇)hadoop之魂--mapreduce計算框架,讓收集的數據產生價值  

    首先在客戶端Client啟動一個作業(Job),JobTracker請求一個Job ID。將運行作業所需要的資源文件復制到HDFS上,包括MapReduce程序打包的JAR文件、配置文件和客戶端計算所得的輸入劃分信息。這些文件都存放在JobTracker專門為該作業創建的文件夾中,文件夾名為該作業的Job ID。JAR文件默認會有10個副本(mapred.submit.replication屬性控制);輸入劃分信息告訴了JobTracker應該為這個作業啟動多少個map任務等信息。

    

    JobTracker接收到作業后,將其放在一個作業隊列里,等待作業調度器對其進行調度當作業調度器根據自己的調度算法調度到該作業時,會根據輸入劃分信息為每個劃分創建一個map任務,并將map任務分配給TaskTracker執行。對于map和reduce任務,TaskTracker根據主機核的數量和內存的大小有固定數量的map槽和reduce槽。這里需要強調的是:map任務不是隨隨便便地分配給某個TaskTracker的,這里就涉及到面提到的數據本地化Data-Local

   

    TaskTracker每隔一段時間會給JobTracker發送一個心跳,告訴JobTracker它依然在運行,同時心跳中還攜帶著很多的信息,比如當前map任務完成的進度等信息。當JobTracker收到作業的最后一個任務完成信息時,便把該作業設置成“成功”。當JobClient查詢狀態時,它將得知任務已完成,便顯示一條消息給用戶。

(第4篇)hadoop之魂--mapreduce計算框架,讓收集的數據產生價值  

如果具體從map端和reduce端分析,可以參考上面的圖片,具體如下:

Map端流程


1) 每個輸入分片會讓一個map任務來處理,map輸出的結果會暫且放在一個環形內存緩沖區中(該緩沖區的大小默認為100M,由io.sort.mb屬性控制),當該緩沖區快要溢出時(默認為緩沖區大小的80%,由io.sort.spill.percent屬性控制),會在本地文件系統中創建一個溢出文件,將該緩沖區中的數據寫入這個文件。


2) 在寫入磁盤之前,線程首先根據reduce任務的數目將數據劃分為相同數目的分區,也就是一個reduce任務對應一個分區的數據。這樣做是為了避免有些reduce任務分配到大量數據,而有些reduce任務卻分到很少數據,甚至沒有分到數據的尷尬局面。其實分區就是對數據進行hash的過程。然后對每個分區中的數據進行排序,如果此時設置了Combiner,將排序后的結果進行Combine操作,這樣做的目的是讓盡可能少的數據寫入到磁盤。


3) map任務輸出最后一個記錄時,可能會有很多的溢出文件,這時需要將這些文件合并。合并的過程中會不斷地進行排序和Combine操作,目的有兩個:

盡量減少每次寫入磁盤的數據量;

盡量減少下一復制階段網絡傳輸的數據量。

最后合并成了一個已分區且已排序的文件。為了減少網絡傳輸的數據量,這里可以將數據壓縮,只要將mapred.compress.map.out設置為true就可以了。


4) 將分區中的數據拷貝給相對應的reduce任務。分區中的數據怎么知道它對應的reduce是哪個呢?其實map任務一直和其父TaskTracker保持聯系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整個集群中的宏觀信息。只要reduce任務向JobTracker獲取對應的map輸出位置就可以了。


Reduce端流程:


1) Reduce會接收到不同map任務傳來的數據,并且每個map傳來的數據都是有序的。如果reduce端接受的數據量相當小,則直接存儲在內存中(緩沖區大小由mapred.job.shuffle.input.buffer.percent屬性控制,表示用作此用途的堆空間的百分比),如果數據量超過了該緩沖區大小的一定比例(由mapred.job.shuffle.merge.percent決定),則對數據合并后溢寫到磁盤中。


2) 隨著溢寫文件的增多,后臺線程會將它們合并成一個更大的有序的文件,這樣做是為了給后面的合并節省時間。其實不管在map端還是reduce端,MapReduce都是反復地執行排序,合并操作,所以排序是hadoop的靈魂。


3) 合并的過程中會產生許多的中間文件(寫入磁盤了),但MapReduce會讓寫入磁盤的數據盡可能地少,并且最后一次合并的結果并沒有寫入磁盤,而是直接輸入到reduce函數。

Map處理數據后,到Reduce得到數據之前,這個流程在MapReduce中可以看做是一個Shuffle的過程


在經過mapper的運行后,我們得知mapper的輸出是這樣一個key/value對。到底當前的key應該交由哪個reduce去做呢,是需要現在決定的。 MapReduce提供Partitioner接口,它的作用就是根據key或value及reduce的數量來決定當前的這對輸出數據最終應該交由哪個reduce task處理。默認對key 做hash后再以reduce task數量取模。默認的取模方式只是為了平均reduce的處理能力,如果用戶自己對Partitioner有需求,可以訂制并設置到job上。


5. MapReduce錯誤處理機制

MapReduce任務執行過程中出現的故障可以分為兩大類:硬件故障和任務執行失敗引發的故障。


1) 硬件故障

Hadoop Cluster中,只有一個JobTracker,因此,JobTracker本身是存在單點故障的。如何解決JobTracker的單點問題呢?我們可以采用主備部署方式,啟動JobTracker主節點的同時,啟動一個或多個JobTracker備用節點。當JobTracker主節點出現問題時,通過某種選舉算法,從備用的JobTracker節點中重新選出一個主節點。


機器故障除了JobTracker錯誤就是TaskTracker錯誤。TaskTracker故障相對較為常見,MapReduce通常是通過重新執行任務來解決該故障。


Hadoop集群中,正常情況下,TaskTracker會不斷的與JobTracker通過心跳機制進行通信。如果某TaskTracker出現故障或者運行緩慢,它會停止或者很少向JobTracker發送心跳。如果一個TaskTracker在一定時間內(默認是1分鐘)沒有與JobTracker通信,那么JobTracker會將此TaskTracker從等待任務調度的TaskTracker集合中移除。同時JobTracker會要求此TaskTracker上的任務立刻返回。如果此TaskTracker任務仍然在mapping階段的Map任務,那么JobTracker會要求其他的TaskTracker重新執行所有原本由故障TaskTracker執行的Map任務。如果任務是在Reduce階段的Reduce任務,那么JobTracker會要求其他TaskTracker重新執行故障TaskTracker未完成的Reduce任務。比如:一個TaskTracker已經完成被分配的三個Reduce任務中的兩個,因為Reduce任務一旦完成就會將數據寫到HDFS上,所以只有第三個未完成的Reduce需要重新執行。但是對于Map任務來說,即使TaskTracker完成了部分Map,Reduce仍可能無法獲取此節點上所有Map的所有輸出。所以無論Map任務完成與否,故障TaskTracker上的Map任務都必須重新執行。


2) 任務執行失敗引發的故障

在實際任務中,MapReduce作業還會遇到用戶代碼缺陷或進程崩潰引起的任務失敗等情況。用戶代碼缺陷會導致它在執行過程中拋出異常。此時,任務JVM進程會自動退出,并向TaskTracker父進程發送錯誤消息,同時錯誤消息也會寫入log文件,最后TaskTracker將此次任務嘗試標記失敗。對于進程崩潰引起的任務失敗,TaskTracker的監聽程序會發現進程退出,此時TaskTracker也會將此次任務嘗試標記為失敗。對于死循環程序或執行時間太長的程序,由于TaskTracker沒有接收到進度更新,它也會將此次任務嘗試標記為失敗,并殺死程序對應的進程。


在以上情況中,TaskTracker將任務嘗試標記為失敗之后會將TaskTracker自身的任務計數器減1,以便想JobTracker申請新的任務。TaskTracker也會通過心跳機制告訴JobTracker本地的一個任務嘗試失敗。JobTracker接到任務失敗的通知后,通過重置任務狀態,將其加入到調度隊列來重新分配該任務執行(JobTracker會嘗試避免將失敗的任務再次分配給運行失敗的TaskTracker)。如果此任務嘗試了4次(次數可以進行設置)仍沒有完成,就不會再被重試,此時整個作業也就失敗了。


  如果你認真學習了前面的文章,這時你已經了解了數據的存儲,和數據的計算,已經對MapReduce計算框架,有了一個清晰的認識。


      接下來我們就要開始學習應用于分布式應用的協作服務Zookeeper,它在hadoop中有起到了什么重要作用呢?

  (第4篇)hadoop之魂--mapreduce計算框架,讓收集的數據產生價值

   

如何用4個月學會Hadoop開發并找到年薪25萬工作?

 

免費分享一套17年最新Hadoop大數據教程100Hadoop大數據必會面試題

因為鏈接經常被和諧,需要的朋友請加微信 ganshiyun666 來獲取最新下載鏈接,注明“51CTO”


教程已幫助300+人成功轉型Hadoop開發,90%起薪超過20K,工資比之前翻了一倍。

百度Hadoop核心架構師親自錄制

內容包括0基礎入門、Hadoop生態系統、真實商業項目實戰3大部分。其中商業案例可以讓你接觸真實的生產環境,訓練自己的開發能力。

(第4篇)hadoop之魂--mapreduce計算框架,讓收集的數據產生價值

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

开远市| 巴彦县| 开化县| 龙门县| 历史| 涿州市| 西乡县| 平江县| 泗水县| 乳源| 乐山市| 永丰县| 元朗区| 临武县| 侯马市| 楚雄市| 图木舒克市| 石阡县| 景宁| 武邑县| 北碚区| 岚皋县| 运城市| 河北省| 海宁市| 义乌市| 瓦房店市| 勃利县| 绩溪县| 准格尔旗| 克拉玛依市| 遵化市| 游戏| 西充县| 双辽市| 时尚| 苍山县| 偏关县| 通山县| 日喀则市| 逊克县|