您好,登錄后才能下訂單哦!
本篇內容介紹了“Combiner怎么使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
1、combiner的工作位置: kv從緩沖區中溢寫到磁盤時可以使用combiner(只要設置,無條件使用) 每個MapTask的所有數據都從緩沖區寫到磁盤后,在進行歸并時可以使用combiner(滿足條件使用,溢寫次數>=3) 2、Combiner: 合并 目的就是在每個MapTask中將輸出的kv提前進行局部合并。 能夠降低map到reduce傳輸的kv對數量及 reduce最終處理的數據量. 3、Combiner使用限制: 在不改變業務邏輯的情況下才能使用combiner. --例如:求平均值時,就不宜使用 4、Combiner組件父類就是Reducer Combiner是在每一個MapTask所在的節點運行; Reducer是接收全局所有Mappei的輸出結果;
1、自定義Compiner類 /** * combiner作用: * 在mapTask進行溢寫時,對每一個mapTask輸出的數據提前進行局部匯總,減少寫進reduceTask的整體數據量 * 注意:自定義Combiner類,屬于MapTask階段(雖然它繼承Reducer) */ public class WordCountCombiner extends Reducer<Text, IntWritable,Text,IntWritable> { int count = 0; @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws Exception{ for (IntWritable value : values) { count+=value.get(); } context.write(key,new IntWritable(count)); } }
2、WordCountMapper public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> { private Text outk = new Text(); private IntWritable outv = new IntWritable(1); @Override protected void map(LongWritable key,Text value,Context context) throws Exception { // 獲取輸入到的一行數據 String lineData = value.toString(); // 提前分析知道,按照空格進行切割,得到每個單詞 String[] splitData = lineData.split(" "); // 遍歷數據,將切割得到的數據寫出 for (String str : splitData) { // 注意,這里得到的數據類型是String,需要轉為Text outk.set(str); context.write(outk,outv); } } }
3、WordCountReduce public class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable outv = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws Exception { // 定義一個變量,用來接收遍歷中次數匯總 int count = 0; // 直接讀取values,獲取到迭代器對象中記錄的每個單詞出現次數 for (IntWritable value : values) { // 因為得到的value對象是IntWritable對象,不可以直接進行加操作,所以要轉換為int count += value.get(); //get()方法轉為int } // 寫出計算之后的數據,對count類型進行轉換 outv.set(count); context.write(key,outv); } }
4、WordCountDriver public class WordCountDriver { public static void main(String[] args) throws Exception { // 1、獲取job對象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2、關聯jar,配置執行程序時,使用的具體驅動類 job.setJarByClass(WordCountDriver.class); // 3、關聯mapper 和 reducer job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReduce.class); // 4、設置mapper的輸出的key和value類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5、設置程序最終輸出的key和value類型,如果有reducer // 就寫reducer輸出的kv類型,如果沒有reducer,就寫mapper輸出的kv類型. job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 設置自定義Combiner類 job.setCombinerClass(WordCountCombiner.class); // job.setCombinerClass(WordCountReduce.class);也能這樣用 job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job,4194304); // 6、設置文件的輸入和輸出路徑 FileInputFormat.setInputPaths(job,new Path("D:\\io\\hadooptest\\combineinput")); //要求該路徑不能存在,交給mr程序創建 FileOutputFormat.setOutputPath(job,new Path("D:\\io\\hadooptest\\Combineroutput2")); // 7、提交job job.waitForCompletion(true); } }
①:Outputformat是一個接口,其內部定義兩個抽象方法 --RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,String name,Progressable progress): 該方法用來獲取RecordWriter對象,主負責數據的寫出操作. --void checkOutputSpecs(FileSystem ignored, JobConf job): 該方法用來檢測輸出路徑,當driver中的輸出路徑存在時,會由該方法的實現類拋出異常 //131行拋出異常("Output directory " + outDir + " already exists") ②:通過ctrl+h 查看當前接口的實現類如下圖 --TextOutputFormat(hadoop默認使用的寫出方式),按行寫出,內部重寫了getRecordWriter()方法 --SequenceFileOutputFormat(最終寫出的文件是二進制格式) --MultipleOutputFormat(子抽象類,其下還有具體實現方法)
![OutputFormat實現類](https://oscimg.oschina.net/oscnet/up- 777fe19a5bf6864396beac3aa83d8350e9e.png "OutputFormat實現類")
//1、LogMapper public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws Exception { context.write(value,NullWritable.get()); } }
//2、LogReducer public class LogReducer extends Reducer<Text, NullWritable,Text,NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws Exception{ for (NullWritable value : values) { context.write(key,NullWritable.get()); } } }
//3、MyOutPutFormat public class MyOutPutFormat extends FileOutputFormat<Text, NullWritable> { /** * 重寫getRecordWriter()方法,在內部自定義一個寫出類 * @param job * @return * @throws IOException * @throws InterruptedException */ public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws Exception { LogRecordWriter rw = new LogRecordWriter(job.getConfiguration()); return rw; } }
//4、LogRecordWriter /** * 自定義LogRecordWriter對象需要繼承RecordWriter類 * * 需求: * 將包含"luck"的日志數據寫到 D:/bigtools/luck.log * 將不包含"luck"的日志數據寫到 D:/bigtools/other.log */ public class LogRecordWriter extends RecordWriter { // 文件輸出路徑 private String luckPath = "D:/bigtools/luck.log"; private String otherPath = "D:/bigtools/other.log"; private FSDataOutputStream atguiguOut; private FSDataOutputStream otherOut; private FileSystem fs; /** * 初始化 * @param conf */ public LogRecordWriter(Configuration conf){ try { fs = FileSystem.get(conf); luckOut = fs.create(new Path(luckPath)); otherOut = fs.create(new Path(otherPath)); } catch (IOException e) { e.printStackTrace(); } } /** * 重寫write方法 * @param key * @param value * @throws IOException * @throws InterruptedException */ public void write(Object key, Object value) throws Exception { String log = key.toString(); if(log.contains("luck")){ luckOut.writeBytes(log + "\n"); }else{ otherOut.writeBytes(log + "\n"); } } /** * 關閉流 * @param context * @throws IOException * @throws InterruptedException */ public void close(TaskAttemptContext context) throws IOException, InterruptedException { IOUtils.closeStream(luckOut); IOUtils.closeStream(otherOut); } }
//5、LogDriver public class LogDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(LogDriver.class); job.setMapperClass(LogMapper.class); job.setReducerClass(LogReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 設置自定義輸出 job.setOutputFormatClass(MyOutPutFormat.class); FileInputFormat.setInputPaths(job,new Path("D:\\io\\hadooptest\\loginput")); FileOutputFormat.setOutputPath(job,new Path("D:\\io\\hadooptest\\logoutput")); job.waitForCompletion(true); } }
“Combiner怎么使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。