您好,登錄后才能下訂單哦!
p>首先編寫WordCountDriver:
package com.jym.hadoop.mr.demo;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 這個程序相當于一個yarn集群的客戶端,
* 需要在此封裝我們的mr程序的相關運行參數,指定jar包,
* 最后提交給yarn
* */
public class WordcountDriver
{
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
{
Configuration conf=new Configuration();
/*其實如果在本地運行MR程序其實不用配置下面的代碼程序,在MR默認下就是本地運行*/
/**下面這段代碼配置的是在本地模式下運行MR程序*/
/**是否運行為本地模式,就是看這個參數值是否為local,默認就是local;*/
//conf.set("mapreduce.framework.name", "local"); //在本地運行MR程序
//本地模式運行MR程序時,輸入輸出的數據可以在本地,也可以在hdfs上
//到底在哪里,就看以下兩行配置用哪一行了,默認是“file:///”
/**conf.set("fs.defaultFS", "hdfs://hadoop1:9000");*/ //使用的是HDFS系統
//conf.set("fs.defaultFS", "file:///"); //使用的是本地Windows磁盤
/**運行集群模式,就是把程序提交到yarn中去運行
* 要想運行為集群模式,以下3個參數要指定為集群上的值
* */
conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "hadoop1");
conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
Job job = Job.getInstance(conf);
/**要想在Windows的Eclipse上運行程序,并提交到hadoop的YARN集群上需要指定jar包,如下:*/
/**job.setJar("c:/wc.jar");*/
//job.setJar("/home/hadoop/wc.jar"); //這種是將程序打包成jar包,放到指定的位置,缺乏靈活性,不建議使用;
//指定本程序的jar包所在的本地路徑
job.setJarByClass(WordcountDriver.class);
//指定本業務job要使用的mapper/reducer業務類
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducerr.class);
//指定mapper輸出數據的kv類型;
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定最終輸出的數據的kv類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定需要使用的combiner,以及用哪一個類作為combiner的邏輯
/*job.setCombinerClass(WordcountCombiner.class);*/
job.setCombinerClass(WordcountReducerr.class);
/**因為combiner的工作原理通reducecer的作用是一樣的,所以直接反射調用reducerr類其實作用是一樣的*/
/**此處為之后為測試添加的*/
//如果不設置InputFormat,它默認使用的是TextInputFormat.class
/**job.setInputFormatClass(CombineTextInputFormatInputFormatInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);
*/
//指定job的輸入原始文件所在目錄
//FileInputFormat.setInputPaths(job, new Path("/wordcount/input")); //此處添加的路徑為HDFS文件系統的路徑;
FileInputFormat.setInputPaths(job, new Path(args[0])); //傳一個路徑參數
//指定job的輸出結果所在目錄
FileOutputFormat.setOutputPath(job, new Path(args[1])); //傳一個參數進來作為輸出的路徑參數
//將job中配置的相關參數,以及job所用的Java類所在的jar包,提交給yarn去運行;
/*job.submit(); */
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
其次編寫WordCountMapper:
package com.jym.hadoop.mr.demo;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
//這是一個簡單的MapReduce例子,進行單詞數量的統計操作;
import org.apache.hadoop.mapreduce.Mapper;
/**
* KEYIN:默認情況下,是mr框架所讀到的一行文本的起始偏移量,Long類型,但是在Hadoop中有更精簡的序列化接口,因此采用LongWritable類型;
* VALUEIN:默認情況下,是mr框架所讀到的一行文本的內容,String類型的,同上用Text(org.apache.hadoop.io.Text)類型;
* KEYOUT:是用戶自定義邏輯處理完成之后輸出數據中的key,在此處是單詞,為String類型,同上用Text類型;
* VALUEOUT:是用戶自定義邏輯處理完成之后輸出數據中的value,在此處是單詞數量,為Integer類型,同上用IntWritable類型;
* */
public class WordcountMapper extends Mapper
{
/**
* map階段的業務邏輯就寫在自定義的map()方法中,
* maptask會對每一行輸入數據調用一次我們自定義的map()方法;
* */
@Override //覆寫Mapper中的方法;
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
//將maptask傳給我們的文本內容先轉換成String類型
String line = value.toString();
//根據空格將這一行切分成單詞;
String[] words = line.split(" ");
//將單詞輸出為<單詞,1>
for(String word:words)
{
//將單詞作為key,將次數1作為value,以便于后續的數據分發,可以根據單詞分發,以便于相同單詞會分到相同的reduce task中;
context.write(new Text(word),new IntWritable(1)); //進行類型轉換一下;
}無錫×××醫院 https://yyk.familydoctor.com.cn/20612/
}
最后編寫WordCountReduceer:
package com.jym.hadoop.mr.demo;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* KEYIN,VALUEIN應該對應mapper中的輸出的KEYOUT,VALUEOUT類型;
* KEYOUT是單詞
* VALUEOUT是總次數*/
public class WordcountReducerr extends Reducer
{
/**
* 例如:
*
* 輸入參數key,是一組相同單詞kv對的key
* */
@Override
protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException
{
int count= 0;
/* //采用迭代器的方式進行統計單詞的數量;
Iterator iterator = values.iterator();
while(iterator.hasNext())
{
count+=iterator.next().get(); //獲取key對應的value值
}
*/
//下面的for循環和上面注釋中的效果是一樣的;
for(IntWritable value:values)
{
count+=value.get();
}
//輸出統計結果
context.write(key, new IntWritable(count));
/**
* 默認情況下reduce task會將輸出結果放到一個文件中(最好是HDFS文件系統上的一個文件)
* */
}
}
然而還可以編寫一個Combiner類:
package com.jym.hadoop.mr.demo;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* 此處的這個combiner其實不用自己編寫,因為combiner的工作原理同reducer的原理是一樣
* 的,故可以直接反射調用WordcountReducer類即可
* */
public class WordcountCombiner extends Reducer
{
@Override
protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException
{
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。