亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

MapTask流程是怎樣的

發布時間:2021-12-23 16:08:26 來源:億速云 閱讀:193 作者:iii 欄目:大數據

這篇文章主要講解了“MapTask流程是怎樣的”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“MapTask流程是怎樣的”吧!

MapTask流程源碼解讀

1、從job提交流程的24步,開始mapTask的流程分析,進入submitJob  --LocalJobRunner.java中的788行
Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);  //創建一個可以真正執行的Job
該Job: LocalJobRunner$Job , 且是一個線程   $表示內部類
2、因為當前的Job對象是一個線程,所有執行線程要執行run方法,因此直接找到 LocalJobRunner的run方法進行查看
   --定位到537行
TaskSplitMetaInfo[] taskSplitMetaInfos = 
          SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
//讀取切片的metainfo信息,即提交job過程中在臨時目錄中生成的job.splitmetainfo文件
3、向下走斷點,定位到下方代碼  --547行
List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(
               taskSplitMetaInfos, jobId, mapOutputFiles);
//根據切片的metainfo信息,可以得出有多少個切片,再生成對應個數的Runnable對象.
每個Runnable對象對應一個線程,每一個MapTask運行在一個線程中(基于本地模式的分析)
Runnable : LocalJobRunnber$Job$MapTaskRunnable  ---聯想到線程
4、ExecutorService mapService = createMapExecutor();   //創建線程池對象
runTasks(mapRunnables, mapService, "map");// 將所有的LocalJobRunnber$Job$MapTaskRunnable對象提交給
線程池執行,進入到runTasks方法內部。		--LocalJobRunner中的466行
5、//每個線程負責一個Runnable執行,定位到每個Runnable內部的run方法,查看具體執行(以內部類的方式嵌套)
for (Runnable r : runnables) {
        service.submit(r);
   }
LocalJobRunnber$Job$MapTaskRunnable交給每個線程執行時,會執行到 
LocalJobRunnber$Job$MapTaskRunnable的run方法,因此接下來看
LocalJobRunnber$Job$MapTaskRunnable的run方法     --LocalJobRunner中的248行

MapTask流程是怎樣的

6、進入到run方法內部,定位到254行
MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,
            info.getSplitIndex(), 1); 
//創建MapTask對象   --在每一個線程中都會執行,會創建一個mapTask對象

7、進入map.run(localConf, Job.this); --271行 //執行MapTask的run方法,關聯到MapTask方法中的run MapTask流程是怎樣的

進入到MapTask的run方法內
首先進行分區設置
partitions = jobContext.getNumReduceTasks();
      if (partitions > 1) {
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
      } else {
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
          @Override
          public int getPartition(K key, V value, int numPartitions) {
            return partitions - 1;
          }
        };
      }

8、定位到MapTask中run方法的347行,并進入runNewMapper()方法,提前判斷下是否使用新的api
進入runNewMapper()方法,定位到MapTask的745行開始讀源碼
9、--反射的方式創建Mapper對象.  例如: WordCountMapper
	org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)

        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

   --反射的方式創建Inputformat對象, 例如:  TextInputFormat(默認)
     org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
	  
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);

    --獲取當前MapTask所負責的切片信息
     org.apache.hadoop.mapreduce.InputSplit split = null;
    	split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
		
        splitIndex.getStartOffset());

    --獲取RecordReader對象
     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);

10、向下讀取,定位到MapTask的782行 output = new NewOutputCollector(taskContext, job, umbilical, reporter);方法進入

11、定位到MapTask的710行
	collector = createSortingCollector(job, reporter);   //收集器對象,可以理解為緩沖區對象
12、進入到createSortingCollector方法,    --MapTask中的388行
13、collector.init(context);		--初始化緩沖區對象 collector: MapTask$MapOutputBuffer
14、進入到init方法中   --MapTask的968行
15、
①:定位到init方法的980行
--//獲取溢寫百分比 80%,通過mapreduce.map.sort.spill.percent參數來配置
 final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);

--//獲取緩沖區大小 100M,	通過 mapreduce.task.io.sort.mb 參數來配置
 final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB,
          MRJobConfig.DEFAULT_IO_SORT_MB);

--//獲取排序對象  QuickSort.class, 只排索引
sorter = ReflectionUtils.newInstance(job.getClass(
                   MRJobConfig.MAP_SORT_CLASS, QuickSort.class,
                   IndexedSorter.class), job);
--//獲取key的比較器對象
	comparator = job.getOutputKeyComparator();
--//獲取key的序列化對象	k/v serialization  獲取kv的序列化對象
--//獲取計數器對象	output counters
--//compression  獲取編解碼器,進行壓縮操作
--//combiner 獲取Combiner對象,在溢寫及歸并可以使用combiner
--//spillThread.start(); 啟動溢寫線程  ,只有達到溢寫百分比才會發生溢寫操作
16、mapper.run(mapperContext);執行到Mapper對象中的run方法,例如WordCountMapper中的run方法
進入到mapper.run()方法內
執行 setup(context);	--143行
執行 map(context.getCurrentKey(), context.getCurrentValue(), context);	--146行,
進入到wordCount中的map()方法,是一個循環執行的過程
context.wirte(outK,outV);將map方法中處理好的kv寫出
執行cleanup(context);

感謝各位的閱讀,以上就是“MapTask流程是怎樣的”的內容了,經過本文的學習后,相信大家對MapTask流程是怎樣的這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

陆丰市| 昌黎县| 泰和县| 安乡县| 陆川县| 霍邱县| 佳木斯市| 兰考县| 苍梧县| 邹城市| 凤阳县| 瓮安县| 太仓市| 靖西县| 土默特左旗| 华宁县| 平顺县| 青田县| 彭阳县| 邯郸县| 磴口县| 镇平县| 光山县| 龙里县| 姚安县| 澄城县| 崇信县| 健康| 兰州市| 新巴尔虎左旗| 丰都县| 大竹县| 宁阳县| 鄂托克旗| 越西县| 云梦县| 宝坻区| 芦山县| 江西省| 高雄县| 抚远县|